新老DataHub迁移手册

    xiaoxiao2023-06-18  173

    DataHub服务用户迁移文档

    前言

    原Odps版内测DataHub(下文统称为老DataHub服务),于2016年11月21日起已经处于维护状态,新版DataHub届时已经开启公测,公测至今已有半年以上时间,我们决定开始逐步下线老DataHub服务,老版部分用户需要迁移至新版DataHub。

    新版本具有更多的特性,性能功能都有不少提升,可以同时支持数据一份数据同步到Odps、OSS、ElasticSearch等多个不同服务中,且提供WebConsole控制台进行更简单的操作。

    准备工作

    本文档针对使用Logstash、Fluentd、Flume以及使用SDK写入老DataHub服务的用户,提供迁移到新服务的指引,过程中遇到任何困难可以联系我们

    新版DataHub相关文档

    DataHub产品使用文档

    DataHub控制台

    创建新datahub project

    新版DataHub中存在项目空间-Project概念,与Odps中Project类似,但是不等于Odps中的Project,为了方便管理,我们建议迁移时在DataHub中创建与Odps Project同名的Project(不同名称也可以)

    登录DataHub官网控制台,使用阿里云账号登录;点击创建Project,输入名称及描述,点击创建(Project描述中建议携带Project用处及Owner的邮箱或联系方式)

    创建新DataHub topic

    新版DataHub存在主题-Topic的概念,与Odps的Table类似,但是不等于Odps的Table,通常如果是需要导入数据到Odps的话,需要为每张表创建一个Topic,且字段类型、顺序与名称需要一致,Odps中的分区字段当做普通的Topic字段处理,新版DataHub会根据该分区字段再DataHub中的数据值,将数据同步到Odps离线表中。

    例如:

    MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string) 对应Topic应为如下的Schema: Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)

    创建Topic可以通过以下方式:

    若Topic数量较少,可以再WebConsole控制台,进入Project页面后点击创建Topic按钮,选择从MaxCompute导入,输入配置信息后勾选“自动创建DataConnector”,点击“导入表结构”即可导入odps表对应的格式,确认格式无误后选择Shard数量及生命周期, Shard数量建议与老服务一样,生命周期建议3天,点击创建即可。若Topic过多,可以使用迁移工具DataHub表结构迁移工具,工具将对列表中的所有表创建对应Topic及Connector。

    DataHub与MaxCompute字段类型对应表

    MaxCompute表中的类型DataHub Topic中的类型STRINGSTRINGDOUBLEDOUBLEBIGINTBIGINTDATETIMETIMESTAMP (注:以微秒为度量单位)BOOLEANBOOLEANDECIMAL不支持MAP不支持ARRAY不支持

    映射Odps分区

    老DataHub在写入数据时需要直接指定分区,如果是通过fluend或logstash等插件写入的用户是需要配置分区信息或者通过某个时间字段转为固定格式作为分区

    新版DataHub在这一行为上有所改变,Odps表的分区字段再DataHub中将会变成一个普通字段,后台Connector同步任务在同步数据到Odps表时会根据分区字段比如pt具体每条记录的值写入Odps对应分区中。

    例如:

    MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string) 对应Topic应为如下的Schema: Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string) 数据1: ("test", "test", "0.14", "a1", "20170405") 数据2: ("test", "test", "0.14", "aa", "20170406") 则数据1将会同步到odps分区ds=a1,pt=20170405 则数据2将会同步到odps分区ds=a2,pt=20170406 若使用插件导入,并且是通过字符串转换为固定格式的分区值的用户,新的插件需要使用fluentd/logstash的filter功能,对分区字段的值进行转换,具体使用方式可以参考这些开源工具的官方文档

    不同类型接入方式迁移

    使用Java SDK

    需要换成新版本DataHub的SDK,Mvn依赖变化

    原依赖

    <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>0.xxx</version> </dependency>

    新依赖

    <dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.3.0-public</version> </dependency>

    Client初始化

    原Client初始化步骤

    Account account = new AliyunAccount(accessId, accessKey); odps = new Odps(account); odps.setDefaultProject(project); odps.setEndpoint(odpsEndpoint); DatahubClient client = new DatahubClient(odps, project, table, datahubEndpoint); client.loadShard(shardNumber); client.waitForShardLoad();

    新Client初始化步骤

    AliyunAccount account = new AliyunAccount(accessId, accessKey); DatahubConfiguration conf = new DatahubConfiguration(account, datahubEndpoint); DatahubClient client = new DatahubClient(conf);

    获取Shard列表

    原获取Shard列表及状态方式

    HashMap<Long, DatahubClient.ShardState> shardStatus = client.getShardStatus();

    新方式

    ListShardResult listShardResult = client.listShard(projectName, topicName);

    写入数据

    原写入方式

    DatahubWriter writer = client.openDatahubWriter(shardId); TableSchema schema = client.getStreamSchema(); DatahubRecordPack recordPack = new DatahubRecordPack(schema); /* Write another 20 records recordPack into another partition */ for (int i = 0; i < 20; i++) { Record record = makeRecord(schema); recordPack.append(record); } partSpec = "pt='20150809'"; packId = writer.write(new PartitionSpec(partSpec), recordPack) .getPackId(); System.out.println("record append to the pack: " + packId);

    新写入方式

    List<RecordEntry> recordEntries = new ArrayList<RecordEntry>(); RecordEntry entry = new RecordEntry(schema); entry.setString(0, "Test"); entry.setBigint(1, 5L); entry.setShardId(shardId); recordEntries.add(entry); PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries); if (result.getFailedRecordCount() != 0) { List<ErrorEntry> errors = result.getFailedRecordError(); // deal with result.getFailedRecords() }

    完整写入新DataHub示例代码

    使用Fluentd

    通过Fluend插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

    更换,安装新插件包根据配置文件对比,修改现有配置文件使用新配置文件重新启动fluend进程

    插件包更换

    新版Fluentd插件使用文档

    原安装语句

    gem install fluent-plugin-aliyun-odps

    新安装语句(也可按照新版文档提供的一键安装包安装logstash)

    gem install fluent-plugin-datahub

    配置对比

    部分配置不需更改,更改match 部分配置即可。

    老服务配置项新服务配置项备注typetype需要从aliyun_odps改为dataHubaliyun_access_idaccess_id云账号accessidaliyun_access_keyaccess_key云账号accesskeyaliyun_odps_hub_endpointendpointDatahub服务域名,需要改为新服务的域名aliyun_odps_endpoint无不再需要buffer_chunk_limitbuffer_chunk_limit不需要变化,但是新配置不能超过3MBbuffer_queue_limitbuffer_queue_limit不需要变化flush_intervalflush_interval不需要变化projectproject_namedatahub的Project,非odps projecttabletopic_namedatahub的topic,非odps tablefieldscolumn_names指定需要采集的列partition无不再需要time_format无不再需要shard_number无不再需要enable_fast_crc无不再需要retry_timeretry_time重试次数retry_intervalretry_interval重试间隔abandon_mode无不再需要

    新增配置

    新服务配置项备注dirty_data_continuetrue/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件dirty_data_file指定脏数据文件的位置put_data_batch_size每1000条record写一次DataHubshard_id指定shard_id写入,默认round-robin方式写入shard_keys指定用作分区key,用key值hash后作为写入shard的索引

    [TODO] 能否放一个新老的diff文件example

    使用Logstash

    通过Logstash插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

    更换,安装新插件包根据配置文件对比,修改现有配置文件使用新配置文件重新启动Logstash进程

    插件包更换

    新版Logstash插件使用文档

    配置对比

    input部分配置不需更改,更改output部分配置即可。

    老服务配置项新服务配置项备注typetype需要从aliyun_odps改为dataHubaliyun_access_idaccess_id云账号accessidaliyun_access_keyaccess_key云账号accesskeyaliyun_odps_hub_endpointendpointDatahub服务域名,需要改为新服务的域名aliyun_odps_endpoint无不再需要value_field无不再需要projectproject_namedatahub的Project,非odps projecttabletopic_namedatahub的topic,非odps tablepartition无不再需要partition_time_format无不再需要shard_number无不再需要batch_size通过logstash启动参数设置logstash -f <上述配置文件地址> -b 256 (256即为每次batch大小)batch_timeout无不再需要

    新增配置

    新服务配置项备注dirty_data_continuetrue/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件dirty_data_file指定脏数据文件的位置put_data_batch_size每1000条record写一次DataHubshard_keys数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keys和shard_id都未指定,默认轮询落shardshard_id所有数据落指定的shard,注意shard_keys和shard_id都未指定,默认轮询落shardretry_times重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1retry_interval下一次重试的间隔,单位为秒,默认值为5

    使用Apache Flume

    通过Flume工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

    更换,安装新Flume工具插件根据配置文件对比,修改现有配置文件使用新配置文件重新启动Flume进程

    插件更新

    新版Flume工具文档

    配置对比

    老服务配置项新服务配置项备注a1.sinks.k1.typea1.sinks.k1.type从com.aliyun.odps.flume.sink.OdpsSink改为com.aliyun.datahub.flume.sink.DatahubSinka1.sinks.k1.accessIDa1.sinks.k1.datahub.accessID云账号accessida1.sinks.k1.accessKeya1.sinks.k1.datahub.accessKey云账号accesskeya1.sinks.k1.odps.endPointa1.sinks.k1.datahub.endPointDatahub服务域名,需要改为新服务的域名aliyun_odps_endpoint无不再需要a1.sinks.k1.odps.projecta1.sinks.k1.datahub.projectdatahub的Project,非odps projecta1.sinks.k1.odps.tablea1.sinks.k1.datahub.topicdatahub的topic,非odps tablea1.sinks.k1.odps.partition无不再需要a1.sinks.k1.batchSizea1.sinks.k1.batchSize批次大小a1.sinks.k1.serializera1.sinks.k1.serializer无变化a1.sinks.k1.serializer.delimitera1.sinks.k1.serializer.delimiter无变化a1.sinks.k1.serializer.fieldnamesa1.sinks.k1.serializer.fieldnames无变化a1.sinks.k1.serializer.charseta1.sinks.k1.serializer.charset无变化a1.sinks.k1.serializer.delimitera1.sinks.k1.serializer.delimiter无变化a1.sinks.k1.shard.number无不再需要a1.sinks.k1.shard.maxTimeOuta1.sinks.k1.shard.maxTimeOut无变化a1.sinks.k1.autoCreatePartition无不再需要

    使用OGG

    通过OGG工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

    更换,安装新OGG工具插件根据配置文件对比,修改现有配置文件使用新配置文件重新启动OGG进程

    插件更新

    新版OGG工具文档

    配置对比

    老服务配置项新服务配置项备注gg.handlerlistgg.handlerlist不需修改,仍然为ggdatahubgg.handler.ggdatahub.typegg.handler.ggdatahub.type不需修改,仍然为com.aliyun.odps.ogg.handler.datahub.DatahubHandlergg.classpathgg.classpathYOUR_DATAHUB_HANDLER_DIRECTORY/datahub_lib/改为{YOUR_HOME}/datahub-ogg-plugin/lib/

    除以上配置外,其他DataHub相关配置均独立到configure.xml文件配置,具体含义请参看新版OGG工具文档。

    相关资源:七夕情人节表白HTML源码(两款)
    最新回复(0)