侧边栏壁纸
博主头像
AllInOne博主等级

随风来,随风去

  • 累计撰写 45 篇文章
  • 累计创建 27 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Datax导出MySQL数据到HDFS(实战篇)

AllInOne
2024-05-29 / 0 评论 / 1 点赞 / 210 阅读 / 1,176 字
温馨提示:
点赞-关注-不迷路。

概述

使用脚本生成Datax使用的json文件,导出MySQL数据到HDFS。

安装依赖

pip install pymysql

相关脚本

#!/usr/bin/env python
# coding=utf-8
# -*- coding=utf-8
# coding=utf-8
# python gen_import_config.py  -d 数据库 -t 表
import json
import getopt
import os
import sys
import pymysql
 
#MySQL相关配置,需根据实际情况作出修改
mysql_host = ""
mysql_port = 3306
mysql_user = ""
mysql_passwd = ""
 
#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "master"
hdfs_nn_port = "8020"
 
 
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/home/bigdata/test"
 
#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):
    connection = pymysql.connect(
                 host=mysql_host,  # 连接地址, 本地
                 user=mysql_user,    # 用户
                 password=mysql_passwd,  # 数据库密码,记得修改为自己本机的密码
                 port=mysql_port,
                 connect_timeout=10000
                 )
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall
 
#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))
 
#将获取的元数据中mysql的数据类型转换为hive的数据类型  写入到hdfswriter中
def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]
 
    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)
 
#生成json文件
def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": list(get_mysql_columns(source_database, source_table)),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + str(mysql_port) + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": list(get_hive_columns(source_database, source_table)),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)
 
 
def main(args):
    source_database = ""
    source_table = ""
 
    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value
    generate_json(source_database, source_table)
 
if __name__ == '__main__':
    main(sys.argv[1:])

脚本的使用

#!/bin/bash
 
python gen_import_config.py  -d 数据库 -t 表名
python gen_import_config.py  -d 数据库 -t 表名
python gen_import_config.py  -d 数据库 -t 表名
python gen_import_config.py  -d 数据库 -t 表名
python gen_import_config.py  -d 数据库 -t 表名

批量导出数据到HDFS案例

#!/bin/bash
# mysql_to_hdfs_full.sh all 使用例子,改datax的home,还有改配置文件的地址就可以用了
DATAX_HOME=/home/bigdata/datax/datax
 
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期,也就是昨天
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi
 
#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}
 
#数据同步
import_data() {
#$1 /home/bigdata/datax/datax/job/pyjson/bigdata.activity_info.json
#$2 /origin_data/bigdata/db/activity_info_full/$do_date
  datax_config=$1
  target_dir=$2
 
  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}
 
case $1 in
"activity_info")
#/home/bigdata/datax/datax/job/pyjson改成自己文件生成的路径
  import_data /home/bigdata/datax/datax/job/pyjson/bigdata.activity_info.json /origin_data/bigdata/full_db/activity_info_full/$do_date
  ;;
"all")
  import_data /home/bigdata/datax/datax/job/pyjson/bigdata.activity_info.json /origin_data/bigdata/full_db/activity_info_full/$do_date
  ;;
esac
1

评论区