1、Oozie的介绍 Oozie是一个工作流引擎服务器,用于运行Hadoop Map/Reduce和Hive等任务工作流.同时Oozie还是一个Java Web程序,运行在Java Servlet容器中,如Tomcat中。Oozie以action为基本任务单位,可以将多个action构成一个DAG图(有向无环图Direct Acyclic Graph)的模式进行运行。Oozie工作流通过HPDL(一种通过XML自定义处理的语言)来构造Oozie的工作流。一个oozie服务器主要包括四个服务:Oozie Workflow、Oozie Coordinator、Oozie Bundle和Oozie SLA(oozie服务器等级协定)。 1.1 Oozie四大组件服务介绍
Oozie Workflow: 该组件用于定义和执行一个特定顺序的mapreduce、hive和pig作业。Oozie Coordinator:该组件用于支持基于事件、系统资源存在性等条件的workflow的自动化执行。Oozie Bundle:该引擎可以定义和执行"一束"应用,从而提供一个批量化的方法,将一组Coordinator应用程序一起进行管理。Oozie服务器等级协定(Service Level Agreement, SLA):该组件支持workflow应用程序执行过程的记录跟踪。 workflow是oozie中最基本的一个服务组件。三大服务的的关系是:bundle包含多个coordinator,coordinator包含一个workflow,workflow定义具体的action动作。 1.2 Workflow介绍 workflow使用hadoop流程定义语言(Hadoop Process Defination Language, hPDL)来描述工作流,hPDL是类似xml语言的一种相当简洁的定义类语言,使用有限数目的流控制节点和动作节点来描述workflow,说也就是workflow有两大类节点构成:工作流控制节点和动作节点。其中工作流控制节点是workflow本身提供的一种控制workflow工作流执行路径的方法,不可以自定义;动作节点是具体的操作方法,用户可以自定义。 1.3 Workflow工作流生命周期 1.4 Workflow流控制节点 1.5 Workflow默认支持的action动作节点 1.6 Workflow扩展的action动作节点 1.7 Workflow异步操作 Workflow中的所有异步操作(action)都需要在hadoop集群上以mapreduce作业的方式进行执行,这样充分利用了集群的优点。oozie通过两种方式来检查任务是否完成:回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie。轮询:在任务执行回调失败的情况下,无论任何原因,都支持以轮询的方式进行查询。 oozie提供这两种方式来控制任务,将资源密集型的任务放到服务节点之外,使oozie节约服务器资源,确保单个oozie服务器可以支持上千个作业。 注意: 需要hadoop集群开启hdfs、yarn和jobhistory服务。1.8 Workflow规则 workflow任务主要由job.properties、workflow.xml和其他动作需要的资源文件三部分组成,其中job.properties中定义workflow作业的配置信息,workflow.xml定义作业的执行工作流。workflow.xml文件是一个有定义规则的xml文件。 workflow.xml配置规则详见:workflow中的文档workflow.template.xml job.properties配置参考:workflow中的文档job.template.properties
1.9 Coordinator介绍 coordinator支持workflow过程的自动启动,常用于一些由时间/数据可用性来触发的、会多次调用的workflow过程的设计和执行。通过定义多个顺序运行的、前一个输出作为后一个输入的workflow,coordinator也支持定义常规运行的(包括以不同时间间隔运行的)workflow作业之间的依赖。
1.10 Bundle介绍 Oozie Bundle是顶层抽象,允许将一组coordinator任务打包成为一个bundle任务。组成一个整体bundle的多个coordinator可以作为一个整体来进行控制,包括启动、停止、挂起、继续、重新执行等操作。Bundle中不支持它的coordinator应用程序之间的显示依赖关系,如果需要定义这些依赖关系,可以在 coordinator中通过输入输出事件来指定依赖。
2. Oozie的安装 Oozie使用tomcat等服务器作为web界面展示容器,使用关系型数据库存储oozie的工作流元数据,默认使用debry,由于debry的缺点,一般情况使用mysql作为oozie的元数据库,使用extjs来作为报表展示js框架。 安装步骤:
安装mysql安装tomcat安装oozie2.1 Tomcat安装
下载tomcat压缩包: wget http://archive.apache.org/dist/tomcat/tomcat-7/v7.0.8/bin/apache-tomcat-7.0.8.tar.gz解压压缩包设置环境变量启动tomcat测试是否安装成功。2.2 Oozie安装
下载oozie和ext-2.2.zip设置环境变量 export OOZIE_HOME=/home/hadoop/bigdater/oozie-4.0.0-cdh5.3.6 export PATH=$PATH:$OOZIE_HOME/bin 进行conf配置信息修改,修改conf/oozie-site.xml文件,主要就是进行元数据指定和service指定。 oozie-site.xml <?xml version="1.0"?> <configuration> <property> <!--指定service的信息--> <name>oozie.services</name> <value> <!--将JobsConcurrencyService移到第一个--> org.apache.oozie.service.JobsConcurrencyService, org.apache.oozie.service.SchedulerService, org.apache.oozie.service.InstrumentationService, org.apache.oozie.service.MemoryLocksService, org.apache.oozie.service.CallableQueueService, org.apache.oozie.service.UUIDService, org.apache.oozie.service.ELService, org.apache.oozie.service.AuthorizationService, org.apache.oozie.service.UserGroupInformationService, org.apache.oozie.service.HadoopAccessorService, org.apache.oozie.service.URIHandlerService, org.apache.oozie.service.DagXLogInfoService, org.apache.oozie.service.SchemaService, org.apache.oozie.service.LiteWorkflowAppService, org.apache.oozie.service.JPAService, org.apache.oozie.service.StoreService, org.apache.oozie.service.CoordinatorStoreService, org.apache.oozie.service.SLAStoreService, org.apache.oozie.service.DBLiteWorkflowStoreService, org.apache.oozie.service.CallbackService, org.apache.oozie.service.ActionService, org.apache.oozie.service.ShareLibService, org.apache.oozie.service.ActionCheckerService, org.apache.oozie.service.RecoveryService, org.apache.oozie.service.PurgeService, org.apache.oozie.service.CoordinatorEngineService, org.apache.oozie.service.BundleEngineService, org.apache.oozie.service.DagEngineService, org.apache.oozie.service.CoordMaterializeTriggerService, org.apache.oozie.service.StatusTransitService, org.apache.oozie.service.PauseTransitService, org.apache.oozie.service.GroupsService, org.apache.oozie.service.ProxyUserService, org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.JvmPauseMonitorService </value> </property> <!--指定hadoop的配置文件路径--> <property> <name>oozie.service.HadoopAccessorService.hadoop.configurations</name> <value>*=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/etc/hadoop</value> </property> <!--指定其关系型数据库,元数据具体存在哪儿,这里指定为mysql数据库,默认为deby数据库--> <property> <name>oozie.service.JPAService.create.db.schema</name> <value>true</value> </property> <property> <name>oozie.service.JPAService.jdbc.driver</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>oozie.service.JPAService.jdbc.url</name> <value>jdbc:mysql://hh:3306/oozie?createDatabaseIfNotExist=true</value> </property> <property> <name>oozie.service.JPAService.jdbc.username</name> <value>hive</value> </property> <property> <name>oozie.service.JPAService.jdbc.password</name> <value>hive</value> </property> <property> <name>oozie.service.JPAService.jdbc.password</name> <value>hive</value> </property> <!--时间区的设置--> <property> <name>oozie.processing.timezone</name> <value>GMT+0800</value> </property> <!-- start run the shell action config --> <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value>oozie-sla-0.1.xsd,shell-action-0.1.xsd,hive-action-0.2.xsd</value> </property> <property> <name>oozie.service.ActionService.executor.ext.classes</name> <value> org.apache.oozie.action.hadoop.ShellActionExecutor, org.apache.oozie.action.hadoop.HiveActionExecutor </value> </property> <!-- 控制oozie的coordinator任务运行间隔时间是否检测--> <property> <name>oozie.service.coord.check.maximum.frequency</name> <value>false</value> </property> </configuration> 可以在conf/oozie-env.sh中进行参数修改,比如修改端口号,默认端口号为11000oozie根目录创建libext文件夹,复制mysql的driver压缩包到libext文件夹中 cp ~/bigdater/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.31.jar ./libext/ or cp ~/bigdater/softs/mysql-connector-java-5.1.31.jar ./libext/ 执行sql创建,执行完成后,mysql中出现数据库和数据表 在oozie的根目录下执行命令:ooziedb.sh create -sqlfile oozie.sql -run 在hadoop的core-site.xml里设置hadoop代理用户 hadoop.proxyuser.hadoop.hosts&hadoop.proxyuser.hadoop.groups 在hdfs上创建公用文件夹share oozie根目录下执行命令:oozie-setup.sh sharelib create -fs hdfs://hh:8020 -locallib oozie-sharelib-4.0.0-cdh5.3.6-yarn.tar.gz 创建war文件 执行addtowar.sh -inputwar ./oozie.war -outputwar ./oozie-server/webapps/oozie.war -hadoop 2.5.0 $HADOOP_HOME -jars ./libext/mysql-connector-java-5.1.31.jar -extjs ../softs/ext-2.2.zip 或者 将hadoop相关包,mysql相关包和ext压缩包放到libext文件夹中,运行oozie-setup.sh prepare-war也可以创建war文件 运行 oozie的bin目录下执行:oozied.sh run 或者 oozied.sh start(前者在前端运行,后者在后台运行) 查看web界面&查看状态 oozie admin -oozie http://hh:11000/oozie -status3. 案例介绍
定义fs动作,在hdfs文件系统上进行文件操作job.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8032 examplesRoot=fs oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}workflow.xml
<workflow-app name='fs-wf1' xmlns="uri:oozie:workflow:0.3"> <start to="fs-delete"/> <action name="fs-delete"> <fs> <delete path="${nameNode}/beifeng/beifeng14-0.0.1.jar"/> <delete path="${nameNode}/beifeng/index"/> </fs> <ok to="fs-mkdir"/> <error to="fail"/> </action> <action name="fs-mkdir"> <fs> <mkdir path="${nameNode}/beifeng/18"/> </fs> <ok to="fs-move"/> <error to="fail"/> </action> <action name="fs-move"> <fs> <move source="${nameNode}/BF-09.1447769674855.log.tmp" target="${nameNode}/beifeng" /> </fs> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app> 注意:需要先在本地机器上的~/jobs/oozie文件夹中创建比如fs文件夹,并将job.properties、workflow.xml放入该fs文件夹中,下面的类似。 定义fs动作,判断文件夹存在,就删除,如果不存在,不进行任何操作job.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8032 examplesRoot=fs2 oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}workflow.xml
<workflow-app name='fs-wf2' xmlns="uri:oozie:workflow:0.3"> <start to="decision"/> <decision name="decision"> <switch> <case to="fs-delete">${fs:exists("/beifeng/18")}</case> <default to="end"/> </switch> </decision> <action name="fs-delete"> <fs> <delete path="${nameNode}/beifeng/18"/> </fs> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app> 定义shell动作,在服务器的tmp目录下创建一个文件夹job.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8032 examplesRoot=shell EXEC=script.sh oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}workflow.xml
<workflow-app name='shell-wf1' xmlns="uri:oozie:workflow:0.3"> <start to="shell"/> <action name="shell"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <exec>${EXEC}</exec> <file>${EXEC}#${EXEC}</file> </shell> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app>script.sh
mkdir /tmp/beifeng-net 定义hive动作,执行sql脚本,将数据导入到hive中的表中job.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8032 examplesRoot=hive oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}workflow.xml
<workflow-app name='hive-wf1' xmlns="uri:oozie:workflow:0.3"> <start to="hive"/> <action name="hive"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>oozie.hive.defaults</name> <value>hive-site.xml</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://hh:9083</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive</value> </property> </configuration> <script>d.sql</script> </hive> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message> </kill> <end name="end"/> </workflow-app>hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://hh:9083</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://hh:3306/hive?createDatabaseIfNotExist=true&useUnicode=true&characterEncoding=UTF-8</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> </property> <!-- <property> <name>hive.aux.jars.path</name> <value>file:///home/hadoop/jobs/beifeng14-0.0.1.jar</value> </property> --> </configuration>d.sql
load data inpath '/data.txt' into table tdata.txt
1 2 3 4 5 6 7 10 定义mapreduce动作,执行mapreduce任务job.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8032 queueName=default examplesRoot=map-reduce oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}workflow.xml
<workflow-app name='wordcount-wf2' xmlns="uri:oozie:workflow:0.3"> <start to="wordcount"/> <action name="wordcount"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <!-- <prepare></prepare> --> <configuration> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapreduce.job.map.class</name> <value>com.beifeng.mr.wordcount.WordCountMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>com.beifeng.mr.wordcount.WordCountReducer</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>/beifeng/api</value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>org.apache.hadoop.io.Text</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.LongWritable</value> </property> <!-- <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> --> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>/beifeng/wordcount/18</value> </property> <property> <name>mapreduce.job.acl-view-job</name> <value>*</value> </property> <property> <name>oozie.launcher.mapreduce.job.acl-view-job</name> <value>*</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="end"/> </action> <kill name="kill"> <message>${wf:errorCode("wordcount")}</message> </kill> <end name="end"/> </workflow-app>直接将这些文件夹上传到linux机器上(比如上传到~/jobs/oozie文件夹中),然后将所有软件上传到hdfs的/oozie/workflows文件夹中。进入linux上传的目录(这里是jobs/oozie文件夹)执行命令:
oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -run,这里的xxxx就是各个文件夹名称其他相关命令:
oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -submit (提交任务) oozie job -oozie http://hh:11000/oozie -start job_id (运行已经提交的任务) oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -run (submit和start命令合并) oozie job -oozie http://hh:11000/oozie -info job_id (获取任务信息) oozie job -oozie http://hh:11000/oozie -suspend job_id (暂停/挂起任务) oozie job -oozie http://hh:11000/oozie -rerun job_id (重新运行暂停的任务) 定时任务,coordinate的使用案例cronjob.properties
nameNode=hdfs://hh:8020 jobTracker=hh:8030 queueName=default examplesRoot=cron oozie.service.coord.check.maximum.frequency=false oozie.coord.application.path=${nameNode}/oozie/workflows/${examplesRoot} start=2015-11-22T08:06+0800 end=2015-11-25T08:10+0800 workflowAppUri=${oozie.coord.application.path}workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf"> <start to="end"/> <end name="end"/> </workflow-app>coordinator.xml
<coordinator-app name="cron-coord" frequency="${coord:minutes(2)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2"> <action> <workflow> <app-path>${workflowAppUri}</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> </configuration> </workflow> </action> </coordinator-app>