datax同步es数据

    xiaoxiao2022-07-04  153

    xshell登录到服务器进入到/home/datax目录底下   cd /home/datax使用ps -auxww 查看下有多少datax进程

    java -server -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./log -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Ddatax.home=./datax -Dlogback.configurationFile=./datax/conf/logback.xml -classpath ./datax/lib/*:. -Djob_id0=scrmBuyerIntegralLogJobPro -Dlog_file0=scrmBuyerIntegralLogJobPro -Djdbc_url0=jdbc:mysql://10.8.8.8:3306/scrm_biz?zeroDateTimeBehavior=convertToNull -Duser_name0=scrm -Dpassword0='j[B~!depaX#1LNB1' -Des_url0=............ -1 -job ./datax/pro/es.json

    Command是上面这种格式的就是datax的进程,看是跑什么job的就看最后一句/datax/pro/es.json 

     

     -Dloglevel=debug 日志等级,debug级别

     

    命令解释:

    -server :设置JVM使用server模式。64位JDK默认启动该模式

    -Xmx :设置最大的java堆大小

    -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./log设置内存溢出的时候生成dump文件的地址

    -D<propertyName>=value

    在虚拟机的系统属性中设置属性名/值对,运行在此虚拟机之上的应用程序可用System.getProperty(“propertyName”)得到value的值。

    如果value中有空格,则需要用双引号将该值括起来,如-Dname=”space string”。

     

    -Dfile.encoding 设置编码

    Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener 日志配置

    Dlogback.configurationFile 日志配置xml路劲

    Classpath jar包位置

    -Djob_id0=scrmBuyerIntegralLogJobPro 日志id,对应数据库ihd_datax_job_log记录表的newEsJobId

    -Dlog_file0=scrmBuyerIntegralLogJobPro 

    剩下的几个都是自定义的参数,job里面用的

    com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job ./datax/pro/es.json

    Datax启动入口的类位置,最后带上的是job文件的路径

     

    不要出现有两个一样的job,虽然不会报错。但是浪费性能

    如果没有你想要的进程,那就在/home/datax 目录下。执行脚本./xxx.sh

     

    运行的脚本会挂起到后台执行。

     

    JOB详解

    Job大部分是按照datax官方的配置来的

    Datax官方github地址:

    https://github.com/alibaba/DataX

    Setting:

    ts_batch_mins单批次批次分钟数

    ts_interval_sec 任务间隔时间

    ts_adjustnow_sec 服务器时间差

    ts_file task的文件id即job的id

    ts_jdbc_url task连接的数据库地址

    ts_jdbc_uid task链接的数据库账号

    ts_jdbc_pwd task连接的数据库密码

    "ts_jdbc_select":"select newEsJobTime from ihd_datax_job_log where newEsJobId='$ts_key'",

    "ts_jdbc_update":"update datax_job_log set newEsJobTime='$ts_value', esJobRunTime=now() where newEsJobId='$ts_key'",

    "ts_jdbc_error":"update datax_job_log set esJobError='$ts_error', esJobRunTime=now() where newEsJobId='$ts_key'",           

    这三个是配置增量推送的时间

     

    Speed:

    Channel:并行通道数量

    errorLimit:容错率配置

    Record:允许的出错数量值

     

    Content:job的调度内容

    Reader:读插件配置

    Name:mysqlreader插件名称(mysql为例)

    Parameter:插件参数

    Username:用户名称

    Password:数据库密码

    Connection:连接配置

    querySql:读取数据sql

    jdbcUrl:数据库连接地址

    Writer:写插件配置

    Name:elasticwriter 插件名称(es为例)

    Parameter:插件参数

    batchSize:批次数量

    writeMode:写入模式 index或者update

    index_auth:es的账号密码 账号:密码

    Index:索引名称,如果是按时间分表的话。结尾带上%%,会自 动根据指定的时间字段来分索引

    Document:文档类型名称,对应es的_type

    id_field:es id的字段位置,从0开始

    date_field:es日期字段,分表时使用

    month_per_shard:分表月数,1表示1个月一张分表,分表 id会从201801这样开始

    Column:es中的字段名

    Host:es地址

     

     

    Datax源码svn

    https://github.com/crabo/DataX.git/branches/dataX-delta-crabo

     

    最新回复(0)