用例: 如何实现 Hive 挂钩以优化数据湖

    xiaoxiao2022-07-12  139

     

    有关数据的数据

    数据湖和数据沼泽之间的重要区别是谨慎组织的数据导致一个有效的湖泊, 而沼泽只是数据, 要么是过度复制或孤立的用户。 获取有关如何跨组织使用生产数据的信息不仅有利于构建一个组织良好的数据湖, 而且还将帮助数据工程师微调数据管道或数据本身。

    为了了解数据的使用方式, 我们需要找出一些基本问题的答案, 如:

    哪些数据集 (表/视图/DBs) 经常访问?查询何时运行最频繁?哪些用户或应用程序大量利用资源?哪种类型的查询频繁运行?

    最受访问的对象可以很容易地受益于优化, 如压缩、柱状文件格式或数据分解。可以将单独的队列分配给使用大量资源的应用程序或用户来平衡群集上的负载。当大量查询大多运行以满足 sla 并在低使用潮期间缩减以节省成本时, 可以在时间范围内扩展群集资源。

    蜂巢挂钩是方便的方式来回答一些以上的问题和更多!

    挂钩是一种机制, 允许您修改程序的行为。它是拦截应用程序中的功能调用、消息或事件的技术。Hive 提供了许多不同类型的挂钩, 其中一些是下面列出的:

    执行前和后。驱动器前和后运行。执行失败。前和后语义分析。执行后兽人文件转储。QueryLifeTime。减速器

    可以在特定事件中调用每种类型的挂钩, 并且可以自定义以根据用例执行不同的操作。例如, 在执行物理查询计划之前调用预执行挂钩, 并在将查询提交到作业之前调用减速器挂钩. xml 来标记敏感信息。Apache atlas 有一个最流行的 hive 挂钩的实现, 它侦听在 hive 中创建/更新/删除操作, 并通过卡夫卡通知更新 Atlas 中的元数据。

    实现

    可以通过实现 ExecuteWithHookContext 接口来创建预执行挂钩。这是一个空接口, 只需调用该 run 方法HookContext 。HookContext 有很多有关查询、hive 实例和用户的信息。可以轻松利用此信息来检测数据湖如何被其用户使用。

    public HookContext(QueryPlan queryPlan, QueryState queryState, Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress, String hiveInstanceAddress, String operationId, String sessionId, String threadId, boolean isHiveServerQuery, PerfLogger perfLogger, QueryInfo queryInfo) throws Exception {

    任何挂钩实现 HookContext 获取查询计划 ( QueryPlan )。在 a 的引擎盖下 QueryPlan , 有许多的 getter 实现收集有关查询的信息。要说出一些:

    getQueryProperties–获取有关查询的详细信息, 包括查询是否具有联接、组毕斯、分析函数或任何排序/排序操作.getQueryStartTime–返回查询的开始时间.getOperationName–返回由查询执行的操作类型, 例如 CREATETABLE、DROPTABLE、ALTERDATABASE 等.

    为了创建我们自己的 Hive 挂钩, 我们只需要一个 ExecuteWithHookContext run 使用我们的自定义逻辑来捕获数据的类来实现和重写它的方法。

    public class CustomHook implements ExecuteWithHookContext { private static final Logger logger = Logger.getLogger(CustomHook.class.getName()); public void run(HookContext hookContext) throws Exception { assert (hookContext.getHookType() == HookType.PRE_EXEC_HOOK); SessionState ss = SessionState.get(); UserGroupInformation ugi = hookContext.getUgi(); Set<ReadEntity> inputs = hookContext.getInputs(); QueryPlan plan = hookContext.getQueryPlan(); this.run(ss, ugi, plan, inputs); }

    SessionState UserGroupInformation 并且需要收集有关 Hive 会话及其用户的信息。

    public void run(SessionState sess, UserGroupInformation ugi, QueryPlan qpln, Set<ReadEntity> inputs) throws Exception { if (sess != null) { String qid = sess.getQueryId() == null ? qpln.getQueryId() : sess.getQueryId(); String QueryID = qid; String Query = sess.getCmd().trim(); String QueryType = sess.getCommandType(); // get all information about query if (qpln != null) { Long Query_Start_Time = qpln.getQueryStartTime(); QueryProperties queryProps = qpln.getQueryProperties(); if (queryProps != null) { boolean Has_Join = queryProps.hasJoin(); boolean Has_Group_By = queryProps.hasGroupBy(); boolean Has_Sort_By = queryProps.hasSortBy(); boolean Has_Order_By = queryProps.hasOrderBy(); boolean Has_Distribute_By = queryProps.hasDistributeBy(); boolean Has_Cluster_By = queryProps.hasClusterBy(); boolean Has_Windowing = queryProps.hasWindowing(); } } // get user id String username = sess.getUserName() == null ? ugi.getUserName() : sess.getUserName(); // get list of database@table names List<String> tables = new ArrayList<String>(); for (Object o : inputs) { tables.add(o.toString()); } // Add logic here to format logging msg // logger.info(msg) } }

    在分配挂钩之前, 应将已编译的 jar 添加到 Hive 类路径中。可以在由 hive 站点定义的位置添加一个 jar. xml 属性hive.aux.jars.path.可以使用属性hive.exec.pre.hooks将预执行挂钩设置为自定义挂钩的类。使用 Hive CLI, 我们可以执行以下操作:

    set hive.exec.pre.hooks=com.myApp.CustomHook; 

    设置此预执行挂钩后, CustomHook 应为每个用户执行每个查询的代码。收集的 CustomHook 信息可以作为逗号分隔的值记录在公共日志存储库中, 并在以后的任何 BI 工具或 Excel 文件中提取, 以找出有关数据湖使用模式的各种统计信息。

    警告

    虽然钩子是捕获信息的一个很好的方法, 但它们可以为查询执行添加延迟。可以将钩子中的处理保持在最低限度以避免这种开销。无法通过 hive 挂钩捕获有关通过 Spark HiveContext 在 hive 表上完成的处理的信息。火花提供了自己的挂钩机制。
    最新回复(0)