Datahub Python SDK入门手册

    xiaoxiao2025-12-05  26

    前言

    DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。

    安装

    快速安装

    $ sudo pip install pydatahub

    源码安装

    $ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git $ cd aliyun-datahub-sdk-python $ sudo python setup.py install

    安装验证

    $ python -c "from datahub import DataHub"

    如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

    基本概念

    详见: https://help.aliyun.com/document_detail/47440.html?spm=5176.product27797.3.2.VGxgya

    准备工作

    访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。

    创建Project

    登陆Datahub WebConsole页面,创建Project初始化Datahub import sys import traceback from datahub import DataHub from datahub.utils import Configer from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType from datahub.errors import DatahubException, ObjectAlreadyExistException access_id = ***your access id*** access_key = ***your access key*** endpoint = ***your datahub server endpoint*** dh = DataHub(access_id, access_key, endpoint)

    Topic操作

    Tuple Topic

    Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型: 类型含义值域Bigint8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。-9223372036854775807 ~ 9223372036854775807String字符串,只支持UTF-8编码。单个String列最长允许1MB。Boolean布尔型。可以表示为True/False,true/false, 0/1Double8字节双精度浮点数。-1.0 10308 ~ 1.0 10308TimeStamp时间戳类型表示到微秒的时间戳类型 创建示例 topic = Topic(name=topic_name) topic.project_name = project_name topic.shard_count = 3 topic.life_cycle = 7 topic.record_type = RecordType.TUPLE topic.record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], [Fie ldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]) try: dh.create_topic(topic) print "create topic success!" print "=======================================\n\n" except ObjectAlreadyExistException, e: print "topic already exist!" print "=======================================\n\n" except Exception, e: print traceback.format_exc() sys.exit(-1)

    Blob Topic

    Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。 topic = Topic(name=topic_name) topic.project_name = project_name topic.shard_count = 3 topic.life_cycle = 7 topic.record_type = RecordType.BLOB try: dh.create_topic(topic) print "create topic success!" print "=======================================\n\n" except ObjectAlreadyExistException, e: print "topic already exist!" print "=======================================\n\n" except Exception, e: print traceback.format_exc() sys.exit(-1)

    数据发布/订阅

    获取Shard列表

    list_shards接口获取topic下的所有shard shards = dh.list_shards(project_name, topic_name)

    返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

    发布数据

    put_records接口向一个topic发布数据 failed_indexs = dh.put_records(project_name, topic_name, records)

    其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标

    写入Tuple类型Record示例 try: # block等待所有shard状态ready dh.wait_shards_ready(project_name, topic_name) print "shards all ready!!!" print "=======================================\n\n" topic = dh.get_topic(topic_name, project_name) print "get topic suc! topic=%s" % str(topic) if topic.record_type != RecordType.TUPLE: print "topic type illegal!" sys.exit(-1) print "=======================================\n\n" shards = dh.list_shards(project_name, topic_name) for shard in shards: print shard print "=======================================\n\n" records = [] record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000]) record0.shard_id = shards[0].shard_id record0.put_attribute('AK', '47') records.append(record0) record1 = TupleRecord(schema=topic.record_schema) record1['bigint_field'] = 2 record1['string_field'] = 'yc2' record1['double_field'] = 10.02 record1['bool_field'] = False record1['time_field'] = 1455869335000011 record1.shard_id = shards[1].shard_id records.append(record1) record2 = TupleRecord(schema=topic.record_schema) record2['bigint_field'] = 3 record2['string_field'] = 'yc3' record2['double_field'] = 10.03 record2['bool_field'] = False record2['time_field'] = 1455869335000013 record2.shard_id = shards[2].shard_id records.append(record2) failed_indexs = dh.put_records(project_name, topic_name, records) print "put tuple %d records, failed list: %s" %(len(records), failed_indexs) # failed_indexs如果非空最好对failed record再进行重试 print "=======================================\n\n" except DatahubException, e: print traceback.format_exc() sys.exit(-1) else: sys.exit(-1)

    获取cursor

    获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

    OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的recordLATEST: 表示获取的cursor指向当前最新的recordSYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)

    通过get_cursor接口获取用于读取指定位置之后数据的cursor

    订阅数据

    从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。 dh.get_records(topic, shard_id, cursor, 10) 消费Tuple类型Record示例 try: # block等待所有shard状态ready dh.wait_shards_ready(project_name, topic_name) print "shards all ready!!!" print "=======================================\n\n" topic = dh.get_topic(topic_name, project_name) print "get topic suc! topic=%s" % str(topic) if topic.record_type != RecordType.TUPLE: print "topic type illegal!" sys.exit(-1) print "=======================================\n\n" cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, '0') while True: (record_list, record_num, next_cursor) = dh.get_records(topic, '0', cursor, 10) for record in record_list: print record if 0 == record_num: time.sleep(1) cursor = next_cursor except DatahubException, e: print traceback.format_exc() sys.exit(-1) else: sys.exit(-1)

    结尾

    Python API DocPython Package IndexGithub地址 相关资源:阿里云datahub
    最新回复(0)