跳到主要内容

datax 数据同步摸索

可以参考的文档:

datax 任务测试

测试发现自己几年前写的一个python同步平台, 都比开源版本的datax功能丰富多了. 可惜很多年没搞了, 如果再来一遍开发实现这些功能, 就比较疲乏了. 打工人自己没有积累, 只能一遍遍重复打工.

如果要实现一个基于datax的etl平台, 只需要提供交互式的界面用于任务配置, 再结合一个调度平台. 每次调度任务将配置生成到调度机器上, 然后运行datax命令即可. 看起来整条路径并不麻烦, 估计耗时的主要是各种脏数据和异常逻辑的处理细节.

计划是将某个数据库进行全量迁移, 于是顺手测试了datax. 发现配置全库太麻烦了, 一次性的devops还是直接用sqldump吧..

测试datax发现:

  • 无法全库同步, 需要指明需要同步的表名才行; 如果数据库里有多个表, 需要构建多个同步任务; 选项里看起来支持填写多个表, 其实只支持同结构的表进行读取. (可以自行开发处理, 将一个json拓展为多个json)
  • 写入端无法自动生成目标表, 需要提前创建好才行 (可以产品化开发处理)
  • 读取段和写入段都需要配置字段, 不过字段终于可以使用*来替代全部了.
  • 配置json里需要暴露写入端和目标端的数据库账号密码, 不做好管理的话很容易就全暴露了. (其实可以运行过程再通过api去数据库里读取, 而不是都放在json文本里)
  • 拆分任务支持通过主键进行拆分, 需要用户自行制定;
  • 开源的datax只支持单机多线程版本 (可以在多台机器上部署datax, 拆分任务, 自行开发产品化调度平台来规避)
  • 不支持hive读写数据源, 不支持elasticsearch读取数据源 (这些估计都得自行开发了)
  • datax支持配置读写的线程, 读写的速度. (限制速度这点还是很给力的, 还没去看源码不清楚实现细节)
python3 bin/datax.py  job/mysql-ocxx.json 
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://1.1.1.1:8306/discuz?useSSL=false"],
"table": ["z_ucenter_settings"]
}
],
"password": "fake",
"username": "fakeUser",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://3.3.3.3:8306/discuz?useSSL=false",
"table": ["z_ucenter_settings"]
}
],
"password": "fake",
"preSql": [],
"session": [],
"username": "fakeUser",
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 10
}
}
}
}

测试执行日志

ubuntu@7c74ad7d922c:~/project/datax$ python3 bin/datax.py  job/mysql-ocxx.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2023-08-14 15:28:58.281 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2023-08-14 15:28:58.284 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2023-08-14 15:28:58.320 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2023-08-14 15:28:58.329 [main] INFO Engine - the machine info =>

osInfo: Linux amd64 5.15.0-72-generic
jvmInfo: Tencent 1.8 25.372-b1
cpu num: 2

totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1

GC Names [PS MarkSweep, PS Scavenge]

MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB


2023-08-14 15:28:58.345 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"*"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://1.1.1.1:8306/discuz?useSSL=false"
],
"table":[
"z_ucenter_settings"
]
}
],
"password":"***********",
"username":"fakeUser",
"where":""
}
},
"writer":{
"name":"mysqlwriter",
"parameter":{
"column":[
"*"
],
"connection":[
{
"jdbcUrl":"jdbc:mysql://3.3.3.3:8306/discuz?useSSL=false",
"table":[
"z_ucenter_settings"
]
}
],
"password":"***********",
"preSql":[

],
"session":[

],
"username":"fakeUser",
"writeMode":"replace"
}
}
}
],
"setting":{
"speed":{
"channel":10
}
}
}

2023-08-14 15:28:58.374 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false
2023-08-14 15:28:58.375 [main] INFO JobContainer - DataX jobContainer starts job.
2023-08-14 15:28:58.376 [main] INFO JobContainer - Set jobId = 0
2023-08-14 15:28:59.444 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://1.1.1.1:8306/discuz?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2023-08-14 15:28:59.446 [job-0] WARN OriginalConfPretreatmentUtil - 您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.
2023-08-14 15:28:59.777 [job-0] INFO OriginalConfPretreatmentUtil - table:[z_ucenter_settings] all columns:[
k,v
].
2023-08-14 15:28:59.778 [job-0] WARN OriginalConfPretreatmentUtil - 您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.
2023-08-14 15:28:59.781 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
replace INTO %s (k,v) VALUES(?,?)
], which jdbcUrl like:[jdbc:mysql://3.3.3.3:8306/discuz?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true&tinyInt1isBit=false]
2023-08-14 15:28:59.781 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2023-08-14 15:28:59.782 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2023-08-14 15:28:59.783 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .
2023-08-14 15:28:59.785 [job-0] INFO JobContainer - jobContainer starts to do split ...
2023-08-14 15:28:59.786 [job-0] INFO JobContainer - Job set Channel-Number to 10 channels.
2023-08-14 15:28:59.791 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2023-08-14 15:28:59.792 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.
2023-08-14 15:28:59.834 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2023-08-14 15:28:59.839 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2023-08-14 15:28:59.842 [job-0] INFO JobContainer - Running by standalone Mode.
2023-08-14 15:28:59.852 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2023-08-14 15:28:59.861 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2023-08-14 15:28:59.864 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2023-08-14 15:28:59.883 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2023-08-14 15:28:59.888 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select * from z_ucenter_settings
] jdbcUrl:[jdbc:mysql://1.1.1.1:8306/discuz?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2023-08-14 15:29:01.546 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select * from z_ucenter_settings
] jdbcUrl:[jdbc:mysql://1.1.1.1:8306/discuz?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2023-08-14 15:29:01.792 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[1913]ms
2023-08-14 15:29:01.793 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-08-14 15:29:09.884 [job-0] INFO StandAloneJobContainerCommunicator - Total 30 records, 480 bytes | Speed 48B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-08-14 15:29:09.885 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2023-08-14 15:29:09.886 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2023-08-14 15:29:09.887 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2023-08-14 15:29:09.887 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2023-08-14 15:29:09.892 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/coder/project/datax/hook
2023-08-14 15:29:09.895 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%


[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s

2023-08-14 15:29:09.896 [job-0] INFO JobContainer - PerfTrace not enable!
2023-08-14 15:29:09.900 [job-0] INFO StandAloneJobContainerCommunicator - Total 30 records, 480 bytes | Speed 48B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-08-14 15:29:09.902 [job-0] INFO JobContainer -
任务启动时刻 : 2023-08-14 15:28:58
任务结束时刻 : 2023-08-14 15:29:09
任务总计耗时 : 11s
任务平均流量 : 48B/s
记录写入速度 : 3rec/s
读出记录总数 : 30
读写失败总数 : 0

datax 任务配置

在命令行里写配置reader和writer, 运行命令生成默认任务模版支持修改, 并且提供了说明链接, 这样配置起来就方便多了.

 python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
python3 bin/datax.py  -r mysqlreader -w mysqlwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the mysqlwriter document:
https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"preSql": [],
"session": [],
"username": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
created at 2023-08-14