基于分布式数据库的存储和hadoop的分布式计算的分布式sql计算方法

    xiaoxiao2024-05-14  107

     

     

    1.   目录

    2.      目录... 1

    3.      背景和设计思想... 3

    4.      架构... 3

    没有代理节点... 4

    有代理节点... 4

    模块说明... 5

    两种架构的区别... 5

    5.      应用架构... 5

    6.      基本概念说明... 6

    7.      增删改操作... 6

    8.      查询操作... 7

    阶段树... 7

    阶段... 7

    查询步骤... 8

    9.      例子... 8

    均衡策略... 8

    查询... 10

    9..1       排序... 10

    9..2       分组聚合... 11

    9..3       连接... 11

    9..4       子查询... 12

    10.             与已有系统的区别和优点... 12

    11.             应用场景... 13

     

    3.   背景和设计思想

     

    为了解决分布式数据库下,复杂的sql(如全局性的排序、分组、join、子查询,特别是非均衡字段的这些逻辑操作)难以实现的问题;在有了一些分布式数据库和hadoop实际应用经验的基础上,对比两者的优点和不足,加上自己的一些提炼和思考,设计了一套综合两者的系统,利用两者的优点,补充两者的不足,具体的说,使用数据库水平分割的思想实现数据存储,使用mapreduce的思想实现sql计算

     

    这里的数据库水平分割的意思是只分库不分表,对于不同数量级别的表,分库的数量可以不一样,例如1亿的数据量分10个分库,10亿的分50个分库。对于使用mapreduce的思想实现计算;对于一个需求,转换成一个或多个有依赖关系的sql,其中的每个sql分解成一个或多个mapreduce任务,每个mapreduce任务又包含mapsql、洗牌(shuffle)、reducesql,这个过程可以理解为类似hive,区别是连mapreduce任务中的map和reduce操作也是通过sql实现,而非hadoop中的map和reduce操作.

     

    这是基本的mapreduce的思想,但是在hadoop的生态圈中,第一代的mapreduce将结果存储于磁盘,第二代的mapreduce根据内存使用情况将结果存储于内存或磁盘,类比一下用数据库来存储,那么mapreduce的结果就是存储在表中,而数据库的缓存机制天然支持根据内存情况决定存储在内存还是磁盘;另外,hadoop生态圈中,计算模型也并非mareduce一种,这里的mapreduce的计算思想,可以用类似spark的RDD迭代计算方式来替代;本系统还是基于mapreduce来说明的.

     

    4.   架构

    根据以上的思想,系统的架构如下:

    没有代理节点

     

    有代理节点

     

    模块说明

    关于系统中的模块,由于和绝大部分的分布式系统类似,这里仅做简要说明:

     

    名称

    说明

    协调节点,也叫代理节点(proxy node)

    实现常用的数据库客户端和服务端协议,接收客户端请求,并且将请求转换成执行计划,获取执行结果,发送给客户端

    客户端(client)

    发送请求和接收执行结果的;如果是没有协调节点,那么客户端也负责协调节点的工作

    数据库节点(db node)

    用于存储实际的数据,运行接收的mapsql和reducesql

    主控机(master)

    是主进程,管理各个模块和元数据

    元数据库(meta database)

    存储系统的元数据的地方

     

    两种架构的区别

    无代理节点的时候,客户端担负着比较大的工作,包括:发送请求、解析sql、生成执行计划、申请资源、安排执行、获取结果等;有代理节点的时候,代理节点担负着接受请求、解析sql、生成执行计划、申请资源、安排执行、返回结果给客户端等大部分责任,另外代理节点提供支持外部协议的接口,如mysql的c/s协议,使用mysql的命令行可以直接连接进来执行sql,整个系统就像普通的mysql server一样。

     

     

    5.   应用架构

    实际应用环境可能是正式环境一套,正式备份环境一套,线下环境一套,可以按照如下的架构进行部署。

     

     

    6.   基本概念说明

    下面针对架构中的一些概念做些说明

    概念

    说明

    分布式表

    类似关系型表的概念,只不过数据是分布在不同的数据库节点上,通过某个字段将数据水平分割到不同的分库的表中

    分布式表的分割字段,也叫均衡字段

    存储数据的时候决定将数据插入分布式表的某个分库的依据字段,如常用的用户id

    分布式表的分割方法,也叫均衡策略

    存储的时候决定如何根据分割字段将数据插入分布式表的方法,如列表,范围,取余

    计算的洗牌字段

    类似存储数据时候的分割字段,mapreduce计算的时候,将数据插入reduce端数据库表中所依据的字段;是通过分析sql得到

    计算的洗牌方法

    类似存储的时候,在mapreduce计算的时候,决定如何根据洗牌字段将数据插入reduce端数据库表中的方法

    任务树,也叫阶段树

    根据客户端输入的sql,进行分析得到的执行计划

    任务节点,也叫阶段(stage)

    是任务树的某个节点,其实就是mapreduce任务;包含map,洗牌和reduce过程

    存储节点

    对于一个任务,map执行所在的节点称为存储节点,通常是一个任务的元表或者数据源所在的节点

    计算节点

    对于一个任务,reduce执行所在的节点称为计算节点

     

    下面说明常用的增删改查如何执行,特别是查询操作

    7.   增删改操作

    当插入数据的时候,根据均衡字段和均衡策略将记录插入到对应的数据库节点中;当更新数据的时候,需要根据均衡策略判断数据更新前的和更新后的数据库节点是否变化,如果没有变化,直接更新,如果有变化,在更新前的数据库节点中删除老数据,在更新后的数据库节点中插入新数据;当删除数据的时候,根据均衡策略在相应的数据库节点中删除。这三种变更数据的操作,只要涉及到多个节点的数据变更,都需要使用分布式事务保证一致性、原子性等事务特性。

    8.   查询操作

    查询操作的原理类似hive,大家可以对比来理解;为了方便解释查询操作,首先来说明阶段树和阶段的结构,如下图所示:

    阶段树

    阶段

     

    查询步骤

    结合上面的图,查询操作的具体过程如下:

     

    1.         将输入sql经过词法、语法、语义分析,集合表结构信息和数据分布信息,生成包含多个阶段(简称stage)的执行计划,这些阶段具有一定的依赖关系,形成多输入单输出的任务树。

    2.         每个阶段包括两种sql,称为mapsql和reducesql,另外每个阶段包括三个操作,map、数据洗牌和reduce;map和reduce分别执行mapsql和reducesql。

    3.         先在不同的数据库节点中执行map操作,map操作执行mapsql,它的输入是每个数据库节点上的表内部的数据,输出根据某个字段按照一定的规则进行分割,放到不同的结果集中,结果集作为数据洗牌的输入。

    4.         然后执行数据洗牌的过程,将不同结果集拷贝到不同的将要执行reduce的数据库节点上。

    5.         在不同的数据库节点中执行reduce操作,reduce操作执行reducesql;

    6.         最后返回结果。

     

     

    9.   例子

     

    由于系统核心在于存储和计算,下面对存储和计算相关的概念举例说明

    均衡策略

    举例说明均衡策略,基本信息如下:

    表名字:tab_user_login

    表描述:用于存储用户登录信息

    节点数:4,分为0、1、2、3

     

    字段

    字段类型

    描述

    u_id

    int

    用户id

    login_ip

    varchar

    用户登录ip

    login_province

    varchar

    登录省份

    login_dt

    timestamp

    用户登录时间

     

     

    举例说下如下的几种策略:

     

    n  列表: 以登录省份作为均衡字段为例

    登录省份

    节点id

    北京

    0

    广东

    1

    黑龙江

    2

    湖南

    3

    …….

    …….

    河南

    0

    浙江

    1

    辽宁

    2

    四川

    3

     

    n  取余: 除数为4,以用户id作为均衡字段

    用户id%4

    节点id

    0

    0

    1

    1

    2

    2

    3

    3

     

    n  范围: 从0到一亿,以用户id作为均衡字段

     

    用户id范围

    节点id

    0<=value<2500w

    0

    2500w <=value<5000w

    1

    5000w<=value<7500w

    2

    7500w<=value<1亿

    3

     

     

    n  范围和取余结合: 先范围,再取余,除数为4,以用户id作为均衡字段

    (u_id/10000) % 4

    节点id

    0

    0

    1

    1

    2

    2

    3

    3

     

     

     

     

    查询

    举例说明查询操作,基本信息如下:

     

    用户表tab_user_info如下:

     

    字段

    字段类型

    字段描述

    u_id

    Int

    用户id

    u_name

    Varchar

    用户姓名

    u_reg_dt

    Timestamp

    用户注册时间

    u_addr

    Varchar

    用户地址

    u_age

    Int

    用户年龄

     

    用户登录表tab_login_info的结构如下:

     

    字段

    字段类型

    字段描述

    u_id

    Int

    用户id

    login_ip

    Int

    登录ip

    login_dt

    Timestamp

    登录时间

    login_product

    Varchar

    登录到哪个产品中

     

     

    9..1  排序

    排序的关键点是节点之间存在大小关系,大的key或者key范围放到节点id大的节点上,然后在节点上排序,获取数据的时候根据节点id大小依次获取。

     

    以如下sql为例,某一注册时间范围内的用户信息,按照用户id排序:

    select * from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? order by u_id

     

     

    执行计划可能为:

    Map:

    select * from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? order by u_id

     

    Shuffle:

    执行完成之后,这种情况下由于需要按照u_id进行数据洗牌,所以各个存储节点上需要按照u_id进行划分。

    例如有N个计算节点,那么按照(最大u_id-最小u_id)/N平均划分,将不同存储节点上的同一范围的u_id,划分到同一个计算节点上即可(这里的计算节点存在大小关系)。

     

    Reduce:

    select * from tab_user_info t order by u_id

     

     

     

    9..2  分组聚合

    关键点和排序类似,节点之间存在大小关系,大的key或者key范围放到节点id大的节点上,然后在节点上分组聚合,获取数据的时候根据节点id大小依次获取。

     

     

    以如下sql为例,某一注册时间范围内的用户,按照年龄分组,计算每个分组内的用户数:

    select age,count(u_id) v from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? group by age

     

     

    执行计划可能为:

    Map:

    select age,count(u_id) v from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? group by age

     

    Shuffle:

    执行完成之后,这种情况下由于需要按照age进行数据洗牌,考虑到age的唯一值比较少,所以数据洗牌可以将所有的记录拷贝到同一个计算节点上。

     

    Reduce:

    select age,sum(v) from t where group by age

     

     

     

    9..3  连接

    首先明确 join的字段类型为数字类型和字符串类型,其他类型如日期可以转换为这两种。

    数字类型的排序很简单,字符串类型的数据排序需要确定规则,类似mysql中的collation,比较常用的是按照unicode编码顺序,按照实际存储节点的大小等;其次join的方式有等值join和非等值join;以如下常用且比较简单的情况为例。

     

    以如下sql为例,某一注册时间范围内的用户的所有登录信息:

    select t1.u_id,t1.u_name,t2.login_product

    from tab_user_info t1 join tab_login_info t2

    on (t1.u_id=t2.u_id and t1.u_reg_dt>=? and t1.u_reg_dt<=?)

     

     

    执行计划可能为:

     

    Map:

    由于是join,所有的表都要进行查询操作,并且为每张表打上自己的标签,具体实施的时候可以加个表名字字段,在所有存储节点上执行

    select u_id,u_name from tab_user_info t where u_reg_dt>=? and t1.u_reg_dt<=?

    select u_id, login_product from tab_login_info t

     

    Shuffle:

    这种情况下由于需要按照u_id进行数据洗牌,考虑到u_id的唯一值比较多,所以各个存储节点上需要按照u_id进行划分,

    例如有N个计算节点,那么按照(最大u_id-最小u_id)/N平均划分,将不同存储节点上的同一范围的u_id,划分到同一个计算节点上。

     

    Reduce:

    select t1.u_id,t1.u_name,t2.login_product

    from tab_user_info t1 join tab_login_info t2

    on (t1.u_id=t2.u_id)

     

     

    9..4  子查询

    由于子查询可以分解成具有依赖关系的不包含子查询的sql,所以生成的执行计划,就是多个sql的执行计划按照一定的依赖关系进行依次执行。

     

     

    10.         与已有系统的区别和优点

    n  相比hdfs来说,数据的分布是有规则的,hdfs需要启动之后执行命令去查询文件具体在什么节点上;元数据的较小,记录规则即可,管理成本较低,在启动速度方面很快。

    n  数据是放在数据库中的,可以很好的使用索引和数据库本身的缓存机制,大大提高数据查询的效率,特别是在大量数据的情况下,利用索引查询返回少量的数据。

    n  数据可以进行删除和修改,这在基于hdfs的系统中一般比较麻烦和低效。

    n  在计算方面,和mapreduce或者其他的分布式计算框架(如spark)并没有本质的区别(需要进行shuffle);但是由于数据的分布是有规则的,在有些地方可以做的更好,在分布式全文索引体现。

    n  由于线上系统一般使用数据库作为最终的存储位置,而把数据库同步到hdfs中是比较麻烦的,并且对于有删除和更新的情况,同步数据麻烦低效,速度较慢;相比之下,这个方案可以使用数据库本身提供的镜像复制功能来同步,基本没有额外的麻烦和低效的工作。

    n  基于以上,可以把线上系统(主系统)和线下的数据分析挖掘(从系统)做成统一的方案,参见应用架构图。

     

     

     

    11.         应用场景

    最后列举一些应用场景

     

    应用场景

    说明

    线上数据库

    适用于数据量大、并发大、需要分库分表的情况,并且能兼容各种sql,这是最直接且比较合适的场景

    数据分析

    由于系统解决了分布式数据库情况下的复杂sql的执行问题,这也非常容易理解的

    机器学习

    机器学习的逻辑也可以通过sql来实现

    搜索引擎

    通过设计三张分布式表,文档表,单词表,语料库表;文档表使用文档id作为均衡字段,单词表使用单词id作为均衡字段,语料库表使用单词id作为均衡字段,结合搜索引擎的思想,抓取,分析,存储(索引),搜索等步骤,将其中的存储(索引),搜索移植到分布式数据库上即可;其实就是换了存储,传统的搜索引擎使用倒排表存储,如Lucene,现在使用分布式数据库

    流计算

    类似storm,spark streaming,要实现流计算,需要添加其他的组件,如开放服务端口,定时进行sql计算,为了在速度要求比较高,非精确计算的场景查,可以实现类似布隆过滤器来实现唯一值的逻辑计算,而不需要每次使用sql扫描大量数据,只需要每次到布隆过滤查询一次即可

     

    相关资源:分布式数据库和Hadoop都不够好,于是我们设计了分布式SQL计算系统
    最新回复(0)