侧边栏壁纸
博主头像
AllInOne博主等级

随风来,随风去

  • 累计撰写 45 篇文章
  • 累计创建 27 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Datax同步增量StarRocks数据到Tidb(实战篇)

AllInOne
2024-05-30 / 0 评论 / 0 点赞 / 123 阅读 / 731 字
温馨提示:
点赞-关注-不迷路。

相关同步脚本

#!/usr/bin/env python
# coding=utf-8
# MSCK REPAIR TABLE dbname.tablename;
# -*- coding=utf-8
# coding=utf-8
# 操作案例
# vim gen_import_config.py
# python gen_import_config.py  --sd dbname --st tablename --td targetdb --tt targettable
# python /data/app/datax/bin/datax.py /data/app/datax/tidbjson/db.tablename.json
import json
import argparse
import os
import sys
import pymysql
 
#StarRocks相关配置,需根据实际情况作出修改
starrock_host = "ip"
starrock_port = 9030
starrock_user = "xx"
starrock_passwd = "xx"
#条件导出新数据
inc_query_sql = "grain_update_time >= date_sub(date_format(now(),'%Y-%m-%d'),interval 1 day )"  

#TiDB相关配置,需根据实际情况作出修改
tidb_host = "ip"
tidb_port = 4000
tidb_user = "xx"
tidb_passwd = "xxx"
#删除旧数据
 
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/app/datax/tidbjson"
 
#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):
    connection = pymysql.connect(
                 host=starrock_host,  #连接地址, 本地
                 user=starrock_user,    #用户
                 password=starrock_passwd,  #数据库密码,记得修改为自己本机的密码
                 port=starrock_port,
                 connect_timeout=10000
                 )
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall
 
#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))
 
#将获取的元数据中mysql的数据类型转换为hive的数据类型  写入到hdfswriter中
def get_tidb_columns(database, table):
   return map(lambda x: x[0], get_mysql_meta(database, table))
 
#生成json文件
def generate_json(source_database, source_table,sink_database, sink_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": starrock_user,
                        "password": starrock_passwd,
                        "column": list(get_mysql_columns(source_database, source_table)),
                        "where": inc_query_sql,
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + starrock_host + ":" + str(starrock_port) + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": tidb_user,
                        "password": tidb_passwd,
                        "column": list(get_tidb_columns(source_database, source_table)),
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://"+tidb_host+":"+str(tidb_port)+"/"+sink_database,
                                "table": [
                                   sink_table
                                ]
                            }
                        ],
                        "preSql": [
			    "delete from " + sink_table + " where grain_update_time >= date_sub(date_format(now(),'%Y-%m-%d'),interval 1 day )"
                        ]
                    }
                }
            }]
        }
    }
    print(job)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)
 
 
def main(args):    
    # 修改了这里,为需要值的选项添加了冒号,表示这些选项后面需要跟随参数
    parser = argparse.ArgumentParser()
    parser.add_argument('--sd', type=str, default = None)
    parser.add_argument('--st', type=str, default=None)
    parser.add_argument('--td', type=str, default = None)
    parser.add_argument('--tt', type=str, default=None)
    args = parser.parse_args()
    generate_json(args.sd, args.st, args.td, args.tt)
 
if __name__ == '__main__':
    main(sys.argv[1:])



0

评论区