DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。
如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!
详见: https://help.aliyun.com/document_detail/47440.html?spm=5176.product27797.3.2.VGxgya
创建Project
登陆Datahub WebConsole页面,创建Project初始化Datahub import sys import traceback from datahub import DataHub from datahub.utils import Configer from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType from datahub.errors import DatahubException, ObjectAlreadyExistException access_id = ***your access id*** access_key = ***your access key*** endpoint = ***your datahub server endpoint*** dh = DataHub(access_id, access_key, endpoint)返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标
写入Tuple类型Record示例 try: # block等待所有shard状态ready dh.wait_shards_ready(project_name, topic_name) print "shards all ready!!!" print "=======================================\n\n" topic = dh.get_topic(topic_name, project_name) print "get topic suc! topic=%s" % str(topic) if topic.record_type != RecordType.TUPLE: print "topic type illegal!" sys.exit(-1) print "=======================================\n\n" shards = dh.list_shards(project_name, topic_name) for shard in shards: print shard print "=======================================\n\n" records = [] record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000]) record0.shard_id = shards[0].shard_id record0.put_attribute('AK', '47') records.append(record0) record1 = TupleRecord(schema=topic.record_schema) record1['bigint_field'] = 2 record1['string_field'] = 'yc2' record1['double_field'] = 10.02 record1['bool_field'] = False record1['time_field'] = 1455869335000011 record1.shard_id = shards[1].shard_id records.append(record1) record2 = TupleRecord(schema=topic.record_schema) record2['bigint_field'] = 3 record2['string_field'] = 'yc3' record2['double_field'] = 10.03 record2['bool_field'] = False record2['time_field'] = 1455869335000013 record2.shard_id = shards[2].shard_id records.append(record2) failed_indexs = dh.put_records(project_name, topic_name, records) print "put tuple %d records, failed list: %s" %(len(records), failed_indexs) # failed_indexs如果非空最好对failed record再进行重试 print "=======================================\n\n" except DatahubException, e: print traceback.format_exc() sys.exit(-1) else: sys.exit(-1)获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的recordLATEST: 表示获取的cursor指向当前最新的recordSYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)通过get_cursor接口获取用于读取指定位置之后数据的cursor
