数据进入阿里云数加-分析型数据库AnalyticDB(原ADS)的N种方法

    xiaoxiao2026-05-14  19

    从  https://yq.aliyun.com/articles/68208?spm=0.0.0.0.HEVojb&do=login  转载。

    数据进入AnalyticDB(原ADS)的N种方法

     

    分析型数据库(AnalyticDB)是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,用户可以在毫秒级针对千亿级数据进行即时的多维分析透视和业务探索。

    想使用阿里云分析型数据,对于大多数人首先碰到的问题就是数据如何进入到分析型数据库中。按照分析型数据库数据表的更新类型,大致可以分为批量导入和实时写入两种,下面我们针对两种写入方式分别介绍几种常用方案。

    注意:在分析型数据库中建表等准备工作在此不详细说明,请参考分析型数据库的官方说明文档。

    https://help.aliyun.com/document_detail/26403.html?spm=5176.doc26412.6.569.xfDdNf

     

    本文用到的

    阿里云数加-大数据计算服务MaxCompute产品地址: https://www.aliyun.com/product/odps 阿里云数加-分析型数据库AnalyticDB产品地址: https://www.aliyun.com/product/ads

    一、数据批量导入到分析型数据库

    批量导入是利用分析型数据库内置的导入接口,将数据从MaxCompute导入到分析型数据库,因此批量导入方式必须有MaxCompute资源的支持。如果源端为非MaxCompute数据源,那么都需要通过MaxCompute进行中转。批量导入方式适合一次导入比较大量的数据(TB级别)。 下边分别介绍如何将MaxCompute数据源和非MaxCompute数据源批量导入分析型数据库。

    1、MaxCompute数据源批量导入分析型数据库

    1)、通过DataIDE实现批量数据导入

    i. 开通数加开发环境,在数据源管理中配置分析型数据库数据源,并保证连通性。

     

    ii. 账号授权

    源端为MaxCompute数据表,首次导入一个新的MaxCompute表时,需要在MaxCompute中将表Describe和Select权限授权给AnalyticDB的导入账号。公共云导入账号为garuda_build@aliyun.com以及garuda_data@aliyun.com(两个都需要授权)。

    1.  

    1. USE projecta;--表所属ODPS project 2. ADD USER ALIYUN$garuda_build@aliyun.com;--输入正确的云账号 3. ADD USER ALIYUN$garuda_data@aliyun.com; 4. GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$garuda_build@aliyun.com;--输入需要赋权的表和正确的云账号 5. GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$garuda_data@aliyun.com;

    另外为了保护用户的数据安全,导入操作的执行账号与MaxCompute数据表的创建者账号必须是同一个阿里云账号。

     

    iii. 创建数据同步任务,配置数据映射

     

     

    iv. 保存后提交运行,可以通过执行日志监控任务状态。

    特别说明:此方法与DataIDE的工作流结合可以实现周期性自动数据导入。

     

    2)、通过数据集成(Data Integration)实现批量数据导入

    数据集成(Data Integration),是阿里集团对外提供的稳定高效、弹性伸缩的数据集成平台,为阿里云大数据计算引擎(包括MaxCompute、Analytic DB、OSS)提供离线(批量)数据进出通道。有别于传统的客户端点对点同步运行工具,数据集成本身以公有云服务为基本设计目标,集群化、服务化、多租户、水平扩展等功能都是其基本实现要求。采云间、御膳房、聚石塔、孔明灯的后台数据同步均是基于数据集成完成各自的数据传输需求。

     

    使用示例

    i. 开通数据集成,在数据源管理中配置MaxCompute数据源和分析型数据数据源,并保证连通性。

    ii. 创建Pipeline

    Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

    进入数据集成控制台创建普通Pipeline。

     

     

    iii. 创建作业

    系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息。

     

    也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可。

    配置字段映射关系。

     

    创建作业成功。

     

    iv. 账号授权

    操作与第一章第1节第1)部分的账号授权相同。

     

    v. 运行作业

    数据集成可以手动运行作业,也可以定会运行。

    手动运行

     

    定时运行

     

    可以查看执行日志。

     

    3)、通过DataX 实现批量数据导入

    DataX 是阿里巴巴集团内被广泛使用的异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。DataX 本身作为离线数据同步框架,采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer 插件,纳入到整个同步框架中。

     

    使用示例

    i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

    http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

    ii. 查看作业配置文件模板

    python datax.py -r odpsreader -w adswriter

     

    iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

    URL、表名、列名等)。

    { "job": { "content": [ { "reader": { "name": "odpsreader", "parameter": { "accessId": "your_access_id", "accessKey": "your_access_key", "column": [ "id", "name" ], "odpsServer": "http://service.odps.aliyun.com/api", "packageAuthorizedProject": "", "partition": [], "project": "your_project_name", "splitMode": "record", "table": "your_table_name" } }, "writer": { "name": "adswriter", "parameter": { "lifeCycle": 2, "overWrite": "true", "partition": "", "password": "your_access_key", "schema": "your_database_name", "table": "your_table_name", "url": "host:port", "username": "your_access_id" } } } ], "setting": { "speed": { "channel": 1 } } } }

     

    iv. 账号授权

    操作与第一章第1节第1)部分的账号授权相同。

     

    v. 启动任务

    python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

     

     

    导入任务的所有相关信息及执行日志会打印到标准输出。

     

    4)、通过分析型数据库LOAD命令实现批量数据导入

    无论是使用DataIDE,还是数据传输、DataX,其实本质都是利用分析型数据库的LOAD命令将数据从MaxCompute批量写入分析型数据库,所以LOAD命令是最原始的方法。

     

    操作示例

    i. 通过ads.console.aliyun.com登录分析型数据库的web管理工具DMS。

     

    ii. 账号授权

    操作与第一章第1节第1)部分的账号授权相同。

     

    iii. 执行导入

    在DMS上执行导入有两种方式,导入向导和SQL。

    导入向导是一个配置窗口,只需把源表和目标表设置好,即可提交导入任务。

     

    配置好后点击确定,如果之前的步骤都操作无误,导入任务即可成功提交。此种方式比较适合不熟悉SQL编写的使用者。

     

    SQL方式是直接执行LOAD数据的SQL来提交导入任务。

     

    点击执行,如果之前的步骤都操作无误,导入任务即可成功提交。

     

    iv. 查看导入任务状态

    任务提交后可以通过DMS的导入状态页面查看任务状态。

     

     

    2、非MaxCompute数据源批量导入分析型数据库

    1)、通过DateIDE实现批量数据导入

    i. 开通数加开发环境,数据源需要配置到数加DataIDE 中,并保证连通性。目前支持的数据源如下图:

    注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

     

    ii. 账户授权

    在分析型数据库中给cloud-data-pipeline@aliyun-inner.com这个账号至少授予表的Load Data权限。

     

    iii. 创建数据同步任务,配置数据映射

     

    以RDS到分析型数据库为例。

     

    iv. 保存后提交运行,可以通过执行日志监控执行成功与否。

     

    2)、通过数据集成(Data Integration)实现批量数据导入

    数据集成目前数据集成支持(和即将支持)的数据通道包括:

    l   关系型数据库: RDS(MySQL、SQL Server、PostgreSQL)、DRDS

    l   NoSQL数据存储: OTS、OCS

    l   数据仓库: MaxCompute、Analytic DB

    l   结构化存储: OSS

    l   文本:TXT、FTP

     

    同时也存在一些约束和限制:

    支持且仅支持结构化(例如RDS、DRDS等)、半结构化(OTS等)、无结构化(OCS、OSS、TXT等, 要求具体同步数据必须抽象为结构化数据)的数据同步。换言之,Data Integration支持传输能够抽象为逻辑二维表的数据同步,其他完全非结构化数据,例如OSS中存放的一段MP3,Data Integration不支持将其同步到分析型数据库

    使用示例(以RDS->分析型数据库为例)

    i. 开通数据集成,在数据源管理中配置RDS数据源,并保证连通性。

     

    ii. 创建Pipeline

    Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

    进入数据集成控制台创建普通Pipeline。

     

     

    iii. 创建作业

    系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息。

     

    也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可。

    配置字段映射关系。

     

    创建作业成功。

     

    iv. 账号授权

    操作与第一章第2节第1)部分的账号授权相同。

     

    v. 运行作业

    数据集成可以手动运行作业,也可以定会运行。

    手动运行

     

    定时运行

     

    可以查看执行日志。

    3)、通过DataX 实现批量数据导入

    DataX目前已经有了比较全面的插件体系,主流的RDBMS 数据库、NOSQL、大数据计算系统都已经接入。DataX 目前支持数据如下:

    类型

    数据源

    Reader(读)

    Writer(写)

    RDBMS 关系型数据库

    Mysql

    Oracle

    SqlServer

    Postgresql

    达梦

    阿里云数仓数据存储

    ODPS

    ADS

    OSS

    OCS

    NoSQL数据存储

    OTS

    Hbase0.94

    Hbase1.1

    MongoDB

    无结构化数据存储

    TxtFile

    FTP

    HDFS

     

    与数据集成一样,当数据源是OCS、OSS、TXT、FTP或HDFS等非结构化数据时,必须抽象为结构化数据。

     

    使用示例(以RDS->ADS为例)

    注意:由于批量写数据到分析型数据库本质都是通过Load命令从MaxCompute将数据导入分析型数据库,其中涉及MaxCompute中转环境,DataX作为开源工具,并不提供MaxCompute环境,因此使用者需要自行准备MaxCompute环境。另外,由于涉及账号授权等安全操作,DataX也无法将这部分操作集成到工具中,因此整个数据迁移的流程实际由两部组成:RDS->MaxCompute和MaxCompute->分析型数据库。

     

    i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

    http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

    ii. 查看作业配置文件模板

    python datax.py -r rdsreader -w odpswriter (RDS->MaxCompute)

    python datax.py –r odpsreader –w adswriter (MaxCompute->分析型数据库)

     

    iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

    URL、表名、列名等)。

    RDS->MaxCompute

    { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [“id”, “name”], "connection": [ { "jdbcUrl": [“jdbc:mysql://host:port/your_database_name”], "table": [your_table_name] } ], "password": "your_password", "username": "your_username", "where": "" } }, "writer": { "name": "odpswriter", "parameter": { "accessId": "your_access_id", "accessKey": "your_access_key", "column": [“id”, “name”], "odpsServer": " http://service.odps.aliyun.com/api", "partition": "", "project": "your_project_name", "table": "your_table_name", "truncate": true, "tunnelServer": "" } } } ], "setting": { "speed": { "channel": 1 } } } }

     

    MaxCompute->分析型数据库

    { "job": { "content": [ { "reader": { "name": "odpsreader", "parameter": { "accessId": "your_access_id", "accessKey": "your_access_key", "column": [ "id", "name" ], "odpsServer": "http://service.odps.aliyun.com/api", "packageAuthorizedProject": "", "partition": [], "project": "your_project_name", "splitMode": "record", "table": "your_table_name" } }, "writer": { "name": "adswriter", "parameter": { "lifeCycle": 2, "overWrite": "true", "partition": "", "password": "your_access_key", "schema": "your_database_name", "table": "your_table_name", "url": "host:port", "username": "your_access_id" } } } ], "setting": { "speed": { "channel": 1 } } } }

     

     

    iv. 账号授权

    操作与第一章第1节第1)部分的账号授权相同。

     

    v. 执行任务

    依次执行RDS->MaxCompute和MaxCompute->分析型数据库两个任务。

    python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

     

     

    4)、通过分析型数据库LOAD命令实现批量数据导入

    正如前面所说,外部数据批量进入分析型数据库,最终都是通过LOAD命令从MaxCompute将数据导入,因此理论上讲,只要数据能够进入MaxCompute,就可以继续进入分析型数据库。所以只要分别完成这两个步骤,数据也就最终进入了分析型数据库。

    异构数据源如何进入MaxCompute,请参考另外一篇文章https://yq.aliyun.com/articles/65376?spm=5176.100240.searchblog.131.wa3XNH。

    通过LOAD命令从MaxCompute将数据批量导入分析型数据库请参考本章第一节的第4)部分内容,在此不再赘述。

    二、数据实时写入分析型数据库

    实时写入是为了满足使用者需要数据实时进入分析型数据库而开发的功能。实时写入的本质是利用insert语句将数据一条一条的插入目标表。

    注意:实时写入目标表的更新方式必须是实时更新。

    1、利用应用程序实时写入

    分析型数据库支持大部分版本的MySQL JDBC驱动,支持的版本号:

    l   5.0系列: 5.0.2,5.0.3,5.0.4,5.0.5,5.0.7,5.0.8

    l   5.1系列: 5.1.1,5.1.2,5.1.3,5.1.4,5.1.5,5.1.6,5.1.7,5.1.8,5.1.11,5.1.12,5.1.13,5.1.14,5.1.15,5.1.16,5.1.17,5.1.18,5.1.19,5.1.20,5.1.21,5.1.22,5.1.23,5.1.24,5.1.25,5.1.26,5.1.27,5.1.28,5.1.29,5.1.31

    l   5.4系列

    l   5.5系列

     

    目前已经验证可以使用Java、C++、Python、PHP、Scala、R等语言编写程序执行数据写入分析型数据库,下面以Java程序为例。

           

    Connection connection = null; Statement statement = null; try { Class.forName("com.mysql.jdbc.Driver"); String url = "jdbc:mysql://host:ip/{your_database_name}?useUnicode=true&characterEncoding=UTF-8"; Properties connectionProps = new Properties(); connectionProps.put("user", {your_access_id}); connectionProps.put("password", {your_access_key}); connection = DriverManager.getConnection(url, connectionProps); statement = connection.createStatement(); String sql = "insert into table your_table values (1, ‘name1’);"; int status = statement.executeUpdate(sql); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { if (statement != null) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } }

     

    注意:在连接数据库时,用户名和密码是连接该数据库云账号的Access Id和Access Key。

     

    注意:进行写入时,在以下几个地方进行优化,可以提升写入性能。

    l   采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。

    l   按hash分区列聚合写入。分析型数据库需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。用户可自行实现该聚合方法,对分区号的计算规则为partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。

    l   如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。

    l   保持主键相对有序。分析型数据库的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。

    l   增加ignore关键字。执行不带ignore关键字的insert  sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。

     

    2、通过DataIDE实时写入

    i. 开通数加开发环境,数据源需要配置到数加DataIDE 中,并保证连通性。目前支持的数据源如下图:

    注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

     

    ii. 创建数据同步任务,配置数据映射

     

    以RDS到分析型数据库为例,导入模式选择实时导入。

    注意:分析型数据库中的目标表必须是实时更新表。

     

    iv. 保存后提交运行,可以通过执行日志监控执行成功与否。

     

    3、通过数据集成(Data Integration)实时写入

    注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

    使用示例(以RDS->分析型数据库为例)

    i. 开通数据集成,在数据源管理中配置RDS数据源,并保证连通性。

     

    ii. 创建Pipeline

    Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

    进入数据集成控制台创建普通Pipeline。

     

     

    iii. 创建作业

    系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息,导入模式选择实时导入。

     

    也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可,writeMode填写insert。

    配置字段映射关系。

     

    创建作业成功。

     

    iv. 运行作业

    数据集成可以手动运行作业,也可以定会运行。

    手动运行

     

    定时运行

     

    可以查看执行日志。

     

    4、通过DataX实时写入

    注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

     

    使用示例(以RDS->ADS为例)

     

    i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

    http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

    ii. 查看作业配置文件模板

    python datax.py -r rdsreader -w adswriter

     

    iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

    URL、表名、列名等)。

    { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": {your_username}, "password": {your_password}, "column": ["id","name"], "connection": [ { "table": [ {your_table_name} ], "jdbcUrl": [ "jdbc:mysql://host:port/{your_database_name}" ] } ] } }, "writer": { "name": "adswriter", "parameter": { "writeMode": "insert", "username": {your_access_id}, "password": {your_access_key}, "column": ["id","name"], "url": "host:port", "partition": "", "schema": {your_database_name}, "table": {your_table_name} } } } ], "setting": { "speed": { "channel": 1 } } } }

     

    iv. 执行任务

    python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

     

     

    说明:使用DataIDE、数据集成和DataX进行实时写入底层都是调用JDBC接口,但是这三个工具都已经进行了写入的优化,可以节省用户的开发量。另外DataIDE还可以提供定时调度功能,方便数据写入与数据处理进行集成。因此,在没有特殊要求的场景,建议使用DataIDE实现数据实时写入分析型数据库。

    5、利用Kettle实时写入数据

    Pentaho Data Integration(又称Kettle)是一款非常受欢迎的开源ETL工具软件。分析型数据库支持用户利用Kettle将外部数据源写入实时写入表中。Kettle的数据输出程序并未为分析型数据库进行过优化,因此写入分析型数据库的速度并不是很快(通常不超过700 rec/s),不是特别适合大批量数据的写入,但是对于本地文件上传、小数据表等的写入等场景是非常合适的。

     

    使用示例

    i. 下载Kettle,并解压。下载地址http://jaist.dl.sourceforge.net/project/pentaho/Data Integration/7.0/pdi-ce-7.0.0.0-25.zip

     

    ii. 启动Kettle,创建转换

    源端为表输入组件,目的端为表输出组件。

     

    iii. 配置组件属性

    表输入组件

     

     

    表输出组件

     

     

    iv. 执行转换

     

    可以登录DMS查看写入分析型数据库的数据。

    6、利用DataHub和流计算实时写入

    DataHub服务是阿里云提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能。DataHub具体介绍请参考https://datahub.console.aliyun.com/intro/index.html

     

    Alibaba Cloud StreamCompute(阿里云流计算)是运行在阿里云平台上的流式大数据分析平台,提供给用户在云上进行流式数据实时化分析工具。流计算具体介绍请参考https://stream.console.aliyun.com/help/index.html

     

    DataHub和流计算结合使用可以实现数据实时加工处理并写入分析型数据库的需求。

     

    使用示例

    i. 开通DataHub服务和流计算服务,并创建项目。

    目前这两个产品均处于公测阶段,可以申请使用。

    DataHub:请联系阿里云业务接口同学进行开通

    流计算:https://data.aliyun.com/product/sc

    注意:由于流计算依赖于DataHub数据源,因此两个产品必须都开通才能实现此场景。

     

     

     

    ii. 数据写入DataHub

    DataHub已提供SDK,可以通过编写Java程序写入数据。同时与当前流行的部分开源数据收集工具互通,例如LogStash,Fluentd等,可以通过这些数据收集工具直接将数据流向DataHub。具体实现方法请参考https://datahub.console.aliyun.com/intro/guide/index.html

    本例中采用java程序调用SDK写入数据。

     

    String projectName = {your_project_name}; String topicName = {your_topic_name}; String accessId = {your_access_id}; String accessKey = {your_access_key}; String endpoint = "http://dh-cn-hangzhou.aliyuncs.com"; AliyunAccount account = new AliyunAccount(accessId, accessKey); DatahubConfiguration conf = new DatahubConfiguration(account, endpoint); DatahubClient client = new DatahubClient(conf); RecordSchema schema = new RecordSchema(); schema.addField(new Field("id", FieldType.BIGINT)); schema.addField(new Field("name", FieldType.STRING)); ListShardResult listShardResult = client.listShard(projectName, topicName); List<RecordEntry> recordEntries = new ArrayList<RecordEntry>(); String shardId = listShardResult.getShards().get(0).getShardId(); for (long i = 0; i <= 100; i++) { RecordEntry entry = new RecordEntry(schema); entry.setString(1, "name" + i); entry.setBigint(0, i); entry.setShardId(shardId); recordEntries.add(entry); } PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);

     

     

    iii. 创建流计算任务

    --创建DataHub数据源

    CREATE STREAM TABLE cdp_demo_rt_dh ( id BIGINT, name STRING ) WITH ( type='datahub', endpoint='http://dh-cn-hangzhou-internal.aliyuncs.com', roleArn='acs:ram::1811270634786818:role/aliyunstreamdefaultrole', projectName='bigdatatraining', topic='cdp_demo_rt' );

     

    --创建分析型数据库数据源

    CREATE RESULT TABLE cdp_demo_rt_ads ( id BIGINT, name STRING, PRIMARY KEY(id) ) WITH ( type='ads', url='jdbc:mysql://trainning-db1-840e4b36.cn-hangzhou-1.ads.aliyuncs.com:10208/trainning_db1', username='gfCrBlzhaVzZeWDP', password='pWMAPt15gU8IBox9bj9rpjGUxjXcqI', tableName='cdp_demo_rt' );

     

     

    --计算逻辑并写入分析型数据库

    REPLACE INTO TABLE cdp_demo_rt_ads SELECT id, name FROM cdp_demo_rt_dh;

     

    iv. 执行流计算任务

     

    通过DMS查看写入分析型数据库的数据。

     

    当然,根据具体的场景,可以在流计算中定义更加复杂的处理逻辑。

    7、通过数据传输(Data Transmission)的分析型数据库插件将RDS MySQL增量数据实时写入分析型数据库

    通过阿里云数据传输(https://www.aliyun.com/product/dts/),并使用 dts-ads-writer 插件, 可以将您在阿里云RDS中的数据表的变更实时同步到分析型数据库中对应的实时写入表中。

    注意:RDS端目前暂时仅支持MySQL引擎

     

    使用示例

    i. 开通数据传输服务并创建数据订阅通道,具体操作请参考https://help.aliyun.com/document_detail/dts/Getting-Started/data-subscription.html?spm=5176.doc26427.2.3.nJqFL7

     

     

    ii. 下载dts-ads-writer插件,并编辑配置文件app.conf

    下载地址https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/26427/cn_zh/1470925197355/dts-ads-writer-0.17.zip?spm=5176.doc26427.2.2.nJqFL7&file=dts-ads-writer-0.17.zip

     

    {

      "dtsAccessId": {your_access_id},

      "dtsAccessKey": {your_access_key},

      "dtsTunnelId": {your_tunnel_id},

      "adsUserName": {your_access_id},

      "adsPassword": {your_access_key},

      "adsJdbcUrl": "jdbc:mysql://host:port/{your_analyticdb_database_name}",

      "options": {

            "traceSql": false,

            "detailLog": false,

            "isReplaceInvalidInsertValue": true,

            "invalidInsertValueCharacters": "\\\\n,\\\\r,\\\\t,'"

      },

      "tables": [

        {

          "source": {

            "primaryKeys": [

              "id"

            ],

            "db": {your_rds_database_name},

            "table": {your_rds_table_name}

          },

          "target": {

            "db": {your_analyticdb_database_name},

            "table": {your_analyticdb_table_name}

    },

          "columnMapping": {

            "id": "id",

            "name": "name"

          }

        }

      ]

    }

     

     

    iii. 启动插件消费dts订阅通道数据,并写入分析型数据库

    启动后可以通过日志文件查看插件运行情况。

     

    iv. 在RDS中写入一条数据

     

    v. 查看分析型数据库中的数据

     

    注意:使用数据传输的dts-ads-writer插件可以将数据变化实时同步到分析型数据库,数据变化不单单指insert,delete和update操作也可以同步到分析型数据库。

    最新回复(0)