doris 2.1.5 seatunnel 2.3.8
问题: 看下方报错信息 应该是 数据问题,但是下方的doris建表语句 都是 按照最大程度去兼容数据,之前在论坛看到有人问道flink sink doris 遇到了相同的错误,回复说是 加上structmode 为 true 我这个情况在 ST 的配置文件加上 该配置 也没有生效 想请教一下!
背景:基于之前咱们论坛分享的一个数据项目做的练习
具体问题及场景:
使用seatunnel 将数据 mysql -> doris全量迁移
mysql 表结构
CREATE TABLE `cart_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`user_id` varchar(200) DEFAULT NULL COMMENT '用户id',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'skuid',
`cart_price` decimal(10,2) DEFAULT NULL COMMENT '放入购物车时价格',
`sku_num` int(11) DEFAULT NULL COMMENT '数量',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片文件',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称 (冗余)',
`is_checked` int(1) DEFAULT NULL,
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`is_ordered` bigint(20) DEFAULT NULL COMMENT '是否已经下单',
`order_time` datetime DEFAULT NULL COMMENT '下单时间',
`source_type` varchar(20) DEFAULT NULL COMMENT '来源类型',
`source_id` bigint(20) DEFAULT NULL COMMENT '来源编号',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=30402 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='购物车表 用户登录系统时更新冗余'
doris 表结构
-- DROP TABLE IF EXISTS ods.ods_cart_info_full;
CREATE TABLE ods.ods_cart_info_full
(
`id` VARCHAR(255) COMMENT '编号',
`k1` DATE NOT NULL COMMENT '分区字段',
`user_id` STRING COMMENT '用户id',
`sku_id` STRING COMMENT 'sku_id',
`cart_price` DECIMAL(16, 2) COMMENT '放入购物车时价格',
`sku_num` BIGINT COMMENT '数量',
`img_url` STRING COMMENT '商品图片地址',
`sku_name` STRING COMMENT 'sku名称 (冗余)',
`is_checked` STRING COMMENT '是否被选中',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间',
`is_ordered` STRING COMMENT '是否已经下单',
`order_time` STRING COMMENT '下单时间',
`source_type` STRING COMMENT '来源类型',
`source_id` STRING COMMENT '来源编号'
)
ENGINE=OLAP
UNIQUE KEY(`id`,`k1`)
COMMENT '购物车全量表'
PARTITION BY RANGE(`k1`) ()
DISTRIBUTED BY HASH(`id`)
PROPERTIES
(
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32",
"dynamic_partition.create_history_partition" = "true"
);
seatunnel采用自带引擎,conf文件为
#同步MySQL购物车表(全量表)到Doris ods层
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
Jdbc {
result_table_name = "mysql_seatunnel"
url = "jdbc:mysql://192.168.150.102:3306/gmall"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select id, user_id, sku_id, cart_price, sku_num, img_url, sku_name, is_checked, create_time, operate_time, is_ordered, order_time, source_type, source_id from gmall.cart_info"
}
}
transform {
Sql {
source_table_name = "mysql_seatunnel"
result_table_name = "seatunnel_doris"
query = "select id, formatdatetime(create_time,'yyyy-MM-dd') as k1, user_id, sku_id, cart_price, sku_num, img_url, sku_name, is_checked, create_time, operate_time, is_ordered, order_time, source_type, source_id from mysql_seatunnel"
}
}
sink {
Doris {
source_table_name = "seatunnel_doris"
fenodes = "192.168.150.132:8030"
username = "root"
password = "abc123!@#"
table.identifier = "ods.ods_cart_info_full"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
现在 在跑的时候 就报错 之前试了几个全量抽都不报错
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:258)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:188)
... 17 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:256)
... 18 more
Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:162)
at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:143)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:241)
... 6 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
... 2 more