seatunnel doris sink 报错Encountered unqualified data, stop processing, see more in null

Viewed 6

doris 2.1.5 seatunnel 2.3.8
问题: 看下方报错信息 应该是 数据问题,但是下方的doris建表语句 都是 按照最大程度去兼容数据,之前在论坛看到有人问道flink sink doris 遇到了相同的错误,回复说是 加上structmode 为 true 我这个情况在 ST 的配置文件加上 该配置 也没有生效 想请教一下!

背景:基于之前咱们论坛分享的一个数据项目做的练习

具体问题及场景:
使用seatunnel 将数据 mysql -> doris全量迁移
mysql 表结构

CREATE TABLE `cart_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `user_id` varchar(200) DEFAULT NULL COMMENT '用户id',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'skuid',
  `cart_price` decimal(10,2) DEFAULT NULL COMMENT '放入购物车时价格',
  `sku_num` int(11) DEFAULT NULL COMMENT '数量',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片文件',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称 (冗余)',
  `is_checked` int(1) DEFAULT NULL,
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
  `is_ordered` bigint(20) DEFAULT NULL COMMENT '是否已经下单',
  `order_time` datetime DEFAULT NULL COMMENT '下单时间',
  `source_type` varchar(20) DEFAULT NULL COMMENT '来源类型',
  `source_id` bigint(20) DEFAULT NULL COMMENT '来源编号',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=30402 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='购物车表 用户登录系统时更新冗余'

doris 表结构

-- DROP TABLE IF EXISTS ods.ods_cart_info_full;
CREATE  TABLE ods.ods_cart_info_full
(
    `id`           VARCHAR(255) COMMENT '编号',
    `k1`           DATE NOT NULL   COMMENT '分区字段',
    `user_id`      STRING COMMENT '用户id',
    `sku_id`       STRING COMMENT 'sku_id',
    `cart_price`   DECIMAL(16, 2) COMMENT '放入购物车时价格',
    `sku_num`      BIGINT COMMENT '数量',
    `img_url`      STRING COMMENT '商品图片地址',
    `sku_name`     STRING COMMENT 'sku名称 (冗余)',
    `is_checked`   STRING COMMENT '是否被选中',
    `create_time`  STRING COMMENT '创建时间',
    `operate_time` STRING COMMENT '修改时间',
    `is_ordered`   STRING COMMENT '是否已经下单',
    `order_time`   STRING COMMENT '下单时间',
    `source_type`  STRING COMMENT '来源类型',
    `source_id`    STRING COMMENT '来源编号'
)
    ENGINE=OLAP
UNIQUE KEY(`id`,`k1`)
COMMENT '购物车全量表'
PARTITION BY RANGE(`k1`) ()
DISTRIBUTED BY HASH(`id`)
PROPERTIES
(
    "replication_allocation" = "tag.location.default: 1",
    "is_being_synced" = "false",
    "storage_format" = "V2",
    "light_schema_change" = "true",
    "disable_auto_compaction" = "false",
    "enable_single_replica_compaction" = "false",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-60",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "32",
    "dynamic_partition.create_history_partition" = "true"
);

seatunnel采用自带引擎,conf文件为

#同步MySQL购物车表(全量表)到Doris ods层
env {
  execution.parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  Jdbc {
    result_table_name = "mysql_seatunnel"
    url = "jdbc:mysql://192.168.150.102:3306/gmall"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "123456"
    query = "select id, user_id, sku_id, cart_price, sku_num, img_url, sku_name, is_checked, create_time, operate_time, is_ordered, order_time, source_type, source_id from gmall.cart_info"
  }
}

transform {
  Sql {
    source_table_name = "mysql_seatunnel"
    result_table_name = "seatunnel_doris"
    query = "select id, formatdatetime(create_time,'yyyy-MM-dd') as k1,  user_id, sku_id, cart_price, sku_num, img_url, sku_name, is_checked, create_time, operate_time, is_ordered, order_time, source_type, source_id from mysql_seatunnel"
  }
}


sink {
  Doris {
    source_table_name = "seatunnel_doris"
    fenodes = "192.168.150.132:8030"
    username = "root"
    password = "abc123!@#"
    table.identifier = "ods.ods_cart_info_full"
    sink.enable-2pc = "true"
    sink.label-prefix = "test_json"
    doris.config = {
      format="json"
      read_json_by_line="true"
    }
  }
}

现在 在跑的时候 就报错 之前试了几个全量抽都不报错

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:258)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:188)
        ... 17 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:256)
        ... 18 more
Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [CANCELLED][DATA_QUALITY_ERROR]Encountered unqualified data, stop processing, see more in null
        at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:162)
        at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:143)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:241)
        ... 6 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
        ... 2 more
1 Answers

这是个已知问题,没返回errUrl到客户端,这个问题在 2.1.6 修了,Error URL只是没有给客户端返回,但是实际是在be端记录下来了的,可以去be的be/storage/error log 录(be.conf storage root path配置的第一个目录下)下找下最近文件。

建议升级到2.1 最新的 2.1.7 版本进行测试。