相关同步脚本
#!/usr/bin/env python
# coding=utf-8
# MSCK REPAIR TABLE dbname.tablename;
# -*- coding=utf-8
# coding=utf-8
# 操作案例
# vim gen_import_config.py
# python gen_import_config.py --sd dbname --st tablename --td targetdb --tt targettable
# python /data/app/datax/bin/datax.py /data/app/datax/tidbjson/db.tablename.json
import json
import argparse
import os
import sys
import pymysql
#StarRocks相关配置,需根据实际情况作出修改
starrock_host = "ip"
starrock_port = 9030
starrock_user = "xx"
starrock_passwd = "xx"
#条件导出新数据
inc_query_sql = "grain_update_time >= date_sub(date_format(now(),'%Y-%m-%d'),interval 1 day )"
#TiDB相关配置,需根据实际情况作出修改
tidb_host = "ip"
tidb_port = 4000
tidb_user = "xx"
tidb_passwd = "xxx"
#删除旧数据
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/app/datax/tidbjson"
#获取表格的元数据 包含列名和数据类型
def get_mysql_meta(database, table):
connection = pymysql.connect(
host=starrock_host, #连接地址, 本地
user=starrock_user, #用户
password=starrock_passwd, #数据库密码,记得修改为自己本机的密码
port=starrock_port,
connect_timeout=10000
)
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
#获取mysql表的列名
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
#将获取的元数据中mysql的数据类型转换为hive的数据类型 写入到hdfswriter中
def get_tidb_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
#生成json文件
def generate_json(source_database, source_table,sink_database, sink_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": starrock_user,
"password": starrock_passwd,
"column": list(get_mysql_columns(source_database, source_table)),
"where": inc_query_sql,
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + starrock_host + ":" + str(starrock_port) + "/" + source_database]
}]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": tidb_user,
"password": tidb_passwd,
"column": list(get_tidb_columns(source_database, source_table)),
"connection": [
{
"jdbcUrl": "jdbc:mysql://"+tidb_host+":"+str(tidb_port)+"/"+sink_database,
"table": [
sink_table
]
}
],
"preSql": [
"delete from " + sink_table + " where grain_update_time >= date_sub(date_format(now(),'%Y-%m-%d'),interval 1 day )"
]
}
}
}]
}
}
print(job)
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
# 修改了这里,为需要值的选项添加了冒号,表示这些选项后面需要跟随参数
parser = argparse.ArgumentParser()
parser.add_argument('--sd', type=str, default = None)
parser.add_argument('--st', type=str, default=None)
parser.add_argument('--td', type=str, default = None)
parser.add_argument('--tt', type=str, default=None)
args = parser.parse_args()
generate_json(args.sd, args.st, args.td, args.tt)
if __name__ == '__main__':
main(sys.argv[1:])
评论区