案例数据
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
建立分区表
下面建立的是存算分离的表
CREATE DATABASE xx_db;
USE xx;
CREATE TABLE IF NOT EXISTS ods_xx (
`message` varchar(500) NULL COMMENT "message消息体",
`datainputtime` date NULL COMMENT "数据写入时间",
`x` varchar(255) NULL COMMENT "z",
`x` varchar(255) NULL COMMENT "z",
`x` varchar(50) NULL COMMENT "z",
`x` varchar(50) NULL COMMENT "z",
`x` varchar(500) NULL COMMENT "z"
)
DUPLICATE KEY(`message`)
PARTITION BY RANGE(`datainputtime`)()
DISTRIBUTED BY HASH(`message`)
PROPERTIES (
"datacache.enable" = "true",
"datacache.partition_duration" = "1 MONTH",
"enable_async_write_back" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "4",
"dynamic_partition.prefix" = "p",
"dynamic_partition.history_partition_num" = "10"
);
SHOW PARTITIONS FROM ods_xx;
SHOW PROC "/dbs/xx_db";
建立导入任务
CREATE ROUTINE LOAD xx_db.order_ods_xx ON xx_db.ods_xx
COLUMNS(xx, xx, xx, xx, xx, xx, xx=from_unixtime(xx, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "3",
"format" = "json",
"jsonpaths" = "[\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "",
"kafka_topic" = "xx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "test_starock",
"property.session.timeout.ms" = "600000",
"property.request.timeout.ms" = "600000",
"property.fetch.max.wait.ms" = "600000",
"property.max.poll.interval.ms" = "600000",
"property.max.partition.fetch.bytes" = "10485760",
"property.max.poll.records" = "1000",
"property.fetch.message.max.bytes" = "10485760",
);
任务的查看和停止
SHOW ROUTINE LOAD FOR xx_db.order_xx\G
SHOW ROUTINE LOAD TASK WHERE JobName = "xx";
PAUSE ROUTINE LOAD FOR xx_db.order_xx;
RESUME ROUTINE LOAD FOR xx_db.order_xx;
STOP ROUTINE LOAD FOR xx_db.order_xx;

评论区