侧边栏壁纸
博主头像
AllInOne博主等级

随风来,随风去

  • 累计撰写 45 篇文章
  • 累计创建 27 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Kafka导入StarRocks数据(实战篇)

AllInOne
2024-04-23 / 0 评论 / 0 点赞 / 151 阅读 / 547 字
温馨提示:
点赞-关注-不迷路。

案例数据

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

建立分区表

下面建立的是存算分离的表

CREATE DATABASE xx_db;
USE xx;
CREATE TABLE IF NOT EXISTS ods_xx (
    `message` varchar(500) NULL COMMENT "message消息体",
    `datainputtime` date NULL COMMENT "数据写入时间",
    `x` varchar(255) NULL COMMENT "z", 
    `x` varchar(255) NULL COMMENT "z", 
    `x` varchar(50) NULL COMMENT "z",
    `x` varchar(50) NULL COMMENT "z",
    `x` varchar(500) NULL COMMENT "z"
	)
DUPLICATE KEY(`message`)
PARTITION BY RANGE(`datainputtime`)()
DISTRIBUTED BY HASH(`message`)
PROPERTIES (
    "datacache.enable" = "true",
    "datacache.partition_duration" = "1 MONTH",
    "enable_async_write_back" = "false",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.time_zone" = "Asia/Shanghai",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "4",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.history_partition_num" = "10"
);

SHOW PARTITIONS FROM ods_xx;
SHOW PROC "/dbs/xx_db";

建立导入任务

CREATE ROUTINE LOAD xx_db.order_ods_xx ON xx_db.ods_xx
COLUMNS(xx, xx, xx, xx, xx, xx, xx=from_unixtime(xx, '%Y%m%d'))
PROPERTIES
(
    "desired_concurrent_number" = "3",
    "format" = "json",
    "jsonpaths" = "[\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\",\"$.xx\"]"
 )
FROM KAFKA
(
"kafka_broker_list" = "",
"kafka_topic" = "xx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "test_starock",
"property.session.timeout.ms" = "600000",
"property.request.timeout.ms" = "600000",
"property.fetch.max.wait.ms" = "600000",
"property.max.poll.interval.ms" = "600000",
"property.max.partition.fetch.bytes" = "10485760",
"property.max.poll.records" = "1000",
"property.fetch.message.max.bytes" = "10485760",
);

任务的查看和停止

SHOW ROUTINE LOAD FOR xx_db.order_xx\G
SHOW ROUTINE LOAD TASK WHERE JobName = "xx";
PAUSE ROUTINE LOAD FOR xx_db.order_xx;
RESUME ROUTINE LOAD FOR xx_db.order_xx;
STOP ROUTINE LOAD FOR xx_db.order_xx;
0

评论区