flink 1.16 连接doris 2.1.6 写入不进去数据,作业也没有任何报错信息

Viewed 83

sqlserver版本:sqlserver2019
flink版本:1.16
flink cdc 版本:2.2.1
doris connector版本:24.0.1
doris 版本:2.1.6

现象:运行作业,执行select * from ProductLine_source 能实时获取源表的全量+增量数据,但是数据无法通过sink 写入 doris,整个程序的debug 日志没有显示任何错误和调用streamload的过程

源码:

        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        //  创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // enable checkpoint
        env.enableCheckpointing(10000);


        // 创建 SQL Server CDC 表
        tableEnv.executeSql(
                "CREATE TABLE ProductLine_source ( " +
                        "ID BIGINT  , " +
                        "Name STRING , " +
                        "ServiceTeamID BIGINT, " +
                        "QuotationMode INT , " +
                        "Description STRING, " +
                        "NeedSaleForecastProject BOOLEAN, " +
                        "QuotationType INT , " +
                        "NeedServiceSale BOOLEAN , " +
                        "NeedUploadFile BOOLEAN , " +
                        "NeedJoinSameProductInISC BOOLEAN , " +
                        "BusinessUnitId BIGINT, " +
                        "CanCreateInnerServiceType4 BOOLEAN, " +
                        "State INT , " +
                        "PRIMARY KEY (ID) NOT ENFORCED " +
                        ") WITH ( " +
                        "'connector' = 'sqlserver-cdc', " + // 确认这是正确的连接器名称
                        "'hostname' = 'xxxxx', " +
                        "'port' = '1433', " +
                        "'username' = 'xxxx', " +
                        "'password' = 'xxxxx', " +
                        "'database-name' = 'business', " +
                        "'schema-name' = 'dbo', " +  
                        "'table-name' = 'ProductLine' " +
                        ")"
        );
        // 执行查询并打印源表结果
        tableEnv.executeSql("SELECT * FROM ProductLine").print();


        // 创建 Doris 映射表
        tableEnv.executeSql(
                "CREATE TABLE productLine_doris_sink ( " +
                        " ID BIGINT, " +
                        " Name STRING, " +
                        " ServiceTeamID BIGINT, " +
                        " QuotationMode INT, " +
                        " Description STRING, " +
                        " NeedSaleForecastProject BOOLEAN, " +
                        " QuotationType INT, " +
                        " NeedServiceSale BOOLEAN, " +
                        " NeedUploadFile BOOLEAN, " +
                        " NeedJoinSameProductInISC BOOLEAN, " +
                        " BusinessUnitId BIGINT, " +
                        " CanCreateInnerServiceType4 BOOLEAN, " +
                        " State INT " +
                        ") WITH ( " +
                        " 'connector' = 'doris', " +
                        " 'fenodes' = 'xxxxx:8030', " +
                        " 'table.identifier' = 'edm.productLine', " + // 目标表所在位置
                        " 'username' = 'xxx', " +
                        " 'password' = 'xxx', " +
                        " 'sink.label-prefix' = 'productLine_' " +
                        ")"
        );

        tableEnv.executeSql("insert into productLine_doris_sink  SELECT * FROM ProductLine_source");

doris 建表语句

CREATE TABLE `productLine` (
  `ID` bigint NOT NULL,
  `Name` text NOT NULL,
  `ServiceTeamID` bigint NULL,
  `QuotationMode` int NOT NULL,
  `Description` text NULL,
  `NeedSaleForecastProject` boolean NULL,
  `QuotationType` int NOT NULL,
  `NeedServiceSale` boolean NOT NULL,
  `NeedUploadFile` boolean NOT NULL,
  `NeedJoinSameProductInISC` boolean NOT NULL,
  `BusinessUnitId` bigint NULL,
  `CanCreateInnerServiceType4` boolean NULL,
  `State` int NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`ID`)
COMMENT '明细数据表'
DISTRIBUTED BY HASH(`ID`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);

flink debug 日志:

2025-02-13 16:20:11,873 INFO  RelationalSnapshotChangeEventSource:96 - Snapshot step 1 - Preparing
2025-02-13 16:20:11,873 INFO  RelationalSnapshotChangeEventSource:105 - Snapshot step 2 - Determining captured tables
2025-02-13 16:20:11,907 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:11,908 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:11,908 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:11,909 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:11,909 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:11,954 INFO  RelationalSnapshotChangeEventSource:112 - Snapshot step 3 - Locking captured tables [Business.dbo.ProductLine]
2025-02-13 16:20:11,954 INFO  SqlServerSnapshotChangeEventSource:125 - Setting locking timeout to 10 s
2025-02-13 16:20:12,013 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,014 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,055 INFO  SqlServerSnapshotChangeEventSource:130 - Executing schema locking
2025-02-13 16:20:12,055 INFO  SqlServerSnapshotChangeEventSource:137 - Locking table Business.dbo.ProductLine
2025-02-13 16:20:12,077 INFO  RelationalSnapshotChangeEventSource:118 - Snapshot step 4 - Determining snapshot offset
2025-02-13 16:20:12,095 INFO  RelationalSnapshotChangeEventSource:121 - Snapshot step 5 - Reading structure of captured tables
2025-02-13 16:20:12,096 INFO  SqlServerSnapshotChangeEventSource:182 - Reading structure of schema 'Business'
2025-02-13 16:20:12,117 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,119 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,224 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,226 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,330 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,332 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,342 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:12,342 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:12,351 INFO  RelationalSnapshotChangeEventSource:125 - Snapshot step 6 - Persisting schema history
2025-02-13 16:20:12,351 DEBUG RelationalSnapshotChangeEventSource:260 - Capturing structure of table Business.dbo.ProductLine
2025-02-13 16:20:12,357 DEBUG SqlServerDatabaseSchema:46 - Applying schema change event SchemaChangeEvent [database=Business, schema=dbo, ddl=null, tables=[columns: {
  ID decimal() identity(18, 0) NOT NULL
  Name varchar(100) NOT NULL
  ServiceTeamID decimal(18, 0) DEFAULT VALUE NULL
  QuotationMode int(10, 0) NOT NULL
  Description varchar(500) DEFAULT VALUE NULL
  NeedSaleForecastProject bit(1) DEFAULT VALUE NULL
  QuotationType int(10, 0) NOT NULL
  NeedServiceSale bit(1) NOT NULL
  NeedUploadFile bit(1) NOT NULL
  NeedJoinSameProductInISC bit(1) NOT NULL
  BusinessUnitId decimal(18, 0) DEFAULT VALUE NULL
  CanCreateInnerServiceType4 bit(1) DEFAULT VALUE NULL
  State int(10, 0) NOT NULL
}
primary key: [ID]
default charset: null
], type=CREATE]
2025-02-13 16:20:12,359 DEBUG TableSchemaBuilder:103 - Mapping table 'Business.dbo.ProductLine' to schemas under 'sqlserver_transaction_log_source.dbo.ProductLine'
。。。
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'BusinessUnitId' (BYTES) from column BusinessUnitId decimal(18, 0) DEFAULT VALUE NULL
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'CanCreateInnerServiceType4' (BOOLEAN) from column CanCreateInnerServiceType4 bit(1) DEFAULT VALUE NULL
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'State' (INT32) from column State int(10, 0) NOT NULL
2025-02-13 16:20:12,369 DEBUG TableSchemaBuilder:126 - Mapped primary key for table 'Business.dbo.ProductLine' to schema: {"name" : "sqlserver_transaction_log_source.dbo.ProductLine.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "ID", "index" : "0", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]}
2025-02-13 16:20:12,369 DEBUG TableSchemaBuilder:127 - Mapped columns for table 'Business.dbo.ProductLine' to schema: {"name" : "sqlserver_transaction_log_source.dbo.ProductLine.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "ID", "index" : "0", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}, {"name" : "Name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "ServiceTeamID", "index" : "2", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "QuotationMode", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "Description", "index" : "4", "schema" : {"type" : "STRING", "optional" : "true"}}, {"name" : "NeedSaleForecastProject", "index" : "5", "schema" : {"type" : "BOOLEAN", "optional" : "true"}}, {"name" : "QuotationType", "index" : "6", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "NeedServiceSale", "index" : "7", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "NeedUploadFile", "index" : "8", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "NeedJoinSameProductInISC", "index" : "9", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "BusinessUnitId", "index" : "10", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "CanCreateInnerServiceType4", "index" : "11", "schema" : {"type" : "BOOLEAN", "optional" : "true"}}, {"name" : "State", "index" : "12", "schema" : {"type" : "INT32", "optional" : "false"}}]}
2025-02-13 16:20:12,406 INFO  SqlServerSnapshotChangeEventSource:155 - Schema locks released.
2025-02-13 16:20:12,407 INFO  RelationalSnapshotChangeEventSource:137 - Snapshot step 7 - Snapshotting data
2025-02-13 16:20:12,407 INFO  RelationalSnapshotChangeEventSource:294 - Snapshotting contents of 1 tables while still in transaction
2025-02-13 16:20:12,407 DEBUG RelationalSnapshotChangeEventSource:303 - Snapshotting table Business.dbo.ProductLine
2025-02-13 16:20:12,408 INFO  RelationalSnapshotChangeEventSource:328 - 	 Exporting data from table 'Business.dbo.ProductLine' (1 of 1 tables)
2025-02-13 16:20:12,409 INFO  RelationalSnapshotChangeEventSource:336 - 	 For table 'Business.dbo.ProductLine' using select statement: 'SELECT [ProductLine].[ID],[ProductLine].[Name],[ProductLine].[ServiceTeamID],[ProductLine].[QuotationMode],[ProductLine].[Description],[ProductLine].[NeedSaleForecastProject],[ProductLine].[QuotationType],[ProductLine].[NeedServiceSale],[ProductLine].[NeedUploadFile],[ProductLine].[NeedJoinSameProductInISC],[ProductLine].[BusinessUnitId],[ProductLine].[CanCreateInnerServiceType4],[ProductLine].[State] FROM [dbo].[ProductLine]'
2025-02-13 16:20:12,411 INFO  CheckpointCoordinator:845 - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1739434812399 for job ea6d7dce92ca4f42a5037f7ec02cc6ca.
2025-02-13 16:20:12,422 DEBUG TaskExecutor:977 - Trigger checkpoint 1@1739434812399 for 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2025-02-13 16:20:12,424 DEBUG StreamTask:1267 - Starting checkpoint 1 CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,425 DEBUG SubtaskCheckpointCoordinatorImpl:337 - Task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 broadcastEvent at 1739434812425, triggerTime 1739434812399, passed time 26
2025-02-13 16:20:12,428 DEBUG CollectSinkFunction:228 - Checkpoint begin with checkpointId = 1, lastCheckpointedOffset = 0, buffered results bytes = 0
2025-02-13 16:20:12,430 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,431 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,440 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,442 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,442 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,444 DEBUG SubtaskCheckpointCoordinatorImpl:738 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - finished synchronous part of checkpoint 1. Alignment duration: 0 ms, snapshot duration -1 ms, is unaligned checkpoint : false
2025-02-13 16:20:12,447 DEBUG AsyncCheckpointRunnable:112 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - started executing asynchronous part of checkpoint 1. Asynchronous start delay: 1 ms
2025-02-13 16:20:12,460 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 12 ms.
2025-02-13 16:20:12,462 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 2 ms.
2025-02-13 16:20:12,462 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,471 DEBUG AsyncCheckpointRunnable:248 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - finished asynchronous part of checkpoint 1. Asynchronous duration: 16 ms
2025-02-13 16:20:12,476 DEBUG CheckpointCoordinator:1149 - Received acknowledge message for checkpoint 1 from task 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0 of job ea6d7dce92ca4f42a5037f7ec02cc6ca at 7112a818-202e-4dfe-b71d-37fba4080751 @ 127.0.0.1 (dataPort=-1).
2025-02-13 16:20:12,483 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=1}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=1,Name=智成服务,ServiceTeamID=23291,QuotationMode=2,Description=智成服务,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=false,BusinessUnitId=1,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812470,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812475}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,498 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=15}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=15,Name=服务经理,ServiceTeamID=194117,QuotationMode=1,Description=智臻服务经理,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=21,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812496,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812496}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,498 DEBUG SharedStateRegistryImpl:160 - Discard state created before checkpoint 1 and not used afterwards
2025-02-13 16:20:12,499 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=16}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=16,Name=私有云服务,ServiceTeamID=29742,QuotationMode=1,Description=华讯私有云服务,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=5,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812496,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812498}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,502 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=17}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=17,Name=集成-BSM,ServiceTeamID=452163,QuotationMode=1,Description=集成-BSM,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812499,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812499}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,502 INFO  CheckpointCoordinator:1339 - Completed checkpoint 1 for job ea6d7dce92ca4f42a5037f7ec02cc6ca (3954 bytes, checkpointDuration=94 ms, finalizationTime=9 ms).
2025-02-13 16:20:12,503 DEBUG CheckpointCoordinator:1357 - Checkpoint state: OperatorState(operatorID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 3280), OperatorState(operatorID: 570f707193e0fe32f4d86d067aba243b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0), OperatorState(operatorID: b728d985904d42b0fdd945a9e3253fca, parallelism: 1, maxParallelism: 128, coordinatorState: 254 bytes, sub task states: 1, total size (bytes): 674)
2025-02-13 16:20:12,503 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=18}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=18,Name=厂商安全产品集成服务,ServiceTeamID=3928414,QuotationMode=1,Description=厂商安全产品集成服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812501,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812501}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,504 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=19}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=19,Name=CloudEagle,QuotationMode=1,Description=CloudEagle,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812502,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812503}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,504 DEBUG TaskExecutor:1008 - Confirm completed checkpoint 1@1739434812399 and last subsumed checkpoint -1 for 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2025-02-13 16:20:12,505 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=20}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=20,Name=云智能交付服务,ServiceTeamID=3158223,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=true,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812503,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812504}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,506 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=21}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=21,Name=协作高级服务,ServiceTeamID=29742,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=7,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812504,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812504}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,507 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=22}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=22,Name=行业软件,ServiceTeamID=1383821,QuotationMode=2,Description=,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=7,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812506,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812506}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,508 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=23}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=23,Name=云专家服务,ServiceTeamID=3928483,QuotationMode=1,Description=替代原:虚拟化服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=5,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812507,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812507}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,508 DEBUG StreamTask:1409 - Notify checkpoint 1 complete on task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,511 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=24}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=24,Name=公有云增值服务,ServiceTeamID=194117,QuotationMode=1,Description=公有云增值服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=3,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812508,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812508}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,511 DEBUG SubtaskCheckpointCoordinatorImpl:451 - Notification of checkpoint COMPLETE 1 for task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,514 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=25}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=25,Name=智成海外-实施,ServiceTeamID=194117,QuotationMode=1,Description=智成服务(海外),QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=false,BusinessUnitId=1,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812510,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812511}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,514 DEBUG CollectSinkFunction:325 - Checkpoint complete with checkpointId = 1, lastCheckpointedOffset = 0
2025-02-13 16:20:12,515 WARN  DebeziumSourceFunction:465 - Consumer subtask 0 received confirmation for unknown checkpoint id 1
2025-02-13 16:20:12,515 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=26}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=26,Name=新IT智能运维软件,ServiceTeamID=29742,QuotationMode=1,Description=智能运维软件,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812513,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812513}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,516 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=27}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=27,Name=公有云运营服务,ServiceTeamID=23351,QuotationMode=1,Description=公有云运营服务,QuotationType=1,NeedServiceSale=true,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812515,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812515}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,518 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=28}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=28,Name=智能运维订阅软件,ServiceTeamID=29742,QuotationMode=1,Description=智能运维软件订阅,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812516,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812516}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,518 DEBUG SubtaskCheckpointCoordinatorImpl:451 - Notification of checkpoint SUBSUME -1 for task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,519 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=29}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=29,Name=智能运维软件项目费用,ServiceTeamID=29742,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812517,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812518}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,547 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=77}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=77,Name=云智能运维服务,ServiceTeamID=3158223,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812546,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812546}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,547 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,547 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=78}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=78,Name=ObsEagle,QuotationMode=1,Description=,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812547,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812547}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,550 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=79}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=79,Name=AssetFlow,QuotationMode=1,Description=,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812547,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812547}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,551 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=85}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=85,Name=智安软件产品研发,ServiceTeamID=3928414,QuotationMode=1,Description=此产品线用于智安事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812550,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812550}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,551 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,551 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,552 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,552 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=86}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=86,Name=行业数智软件产品研发,ServiceTeamID=5320625,QuotationMode=1,Description=此产品线用于行业数智事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=9,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812551,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812551}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,552 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,552 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=87}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=87,Name=智能运维软件产品研发,ServiceTeamID=7502826,QuotationMode=1,Description=此产品线用于智能运维事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812552,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812552}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,557 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=93}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=93,Name=NetGo111111111,ServiceTeamID=7502826,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812555,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812555}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,557 INFO  RelationalSnapshotChangeEventSource:384 - 	 Finished exporting 85 records for table 'Business.dbo.ProductLine'; total duration '00:00:00.149'
2025-02-13 16:20:12,557 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=true}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=94}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=94,Name=安全运营服务,ServiceTeamID=3928414,QuotationMode=1,Description=智安-安全运营服务SOC,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812555,snapshot=last,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812555}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,570 INFO  AbstractSnapshotChangeEventSource:85 - Snapshot - Final stage
2025-02-13 16:20:12,632 INFO  SqlServerSnapshotChangeEventSource:230 - Removing locking timeout
2025-02-13 16:20:12,651 INFO  ChangeEventSourceCoordinator:111 - Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=sqlserver_transaction_log_source, changeLsn=NULL, commitLsn=00016c6a:00001e6a:0069, eventSerialNo=null, snapshot=FALSE, sourceTime=2025-02-13T08:20:12.555Z], partition={server=sqlserver_transaction_log_source}, snapshotCompleted=true, eventSerialNo=1]]
2025-02-13 16:20:12,657 INFO  StreamingChangeEventSourceMetrics:60 - Connected metrics set to 'true'
2025-02-13 16:20:12,657 INFO  ChangeEventSourceCoordinator:151 - Starting streaming
2025-02-13 16:20:12,658 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,660 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,660 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,660 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,661 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,764 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,766 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,766 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,767 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,767 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,798 INFO  SqlServerStreamingChangeEventSource:340 - CDC is enabled for table Capture instance "dbo_SaleContractCostRecord4" [sourceTableId=Business.dbo.SaleContractCostRecord4, changeTableId=Business.cdc.dbo_SaleContractCostRecord4_CT, startLsn=00016c00:000037d4:0001, changeTableObjectId=1925686008, stopLsn=NULL] but the table is not whitelisted by connector
。。。
2025-02-13 16:20:12,798 INFO  SqlServerStreamingChangeEventSource:340 - CDC is enabled for table Capture instance "dbo_orders" [sourceTableId=Business.dbo.orders, changeTableId=Business.cdc.dbo_orders_CT, startLsn=00016c65:000037da:0046, changeTableObjectId=1938210055, stopLsn=NULL] but the table is not whitelisted by connector
2025-02-13 16:20:12,814 DEBUG SqlServerStreamingChangeEventSource:145 - No change in the database
2025-02-13 16:20:12,841 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:789 - Embedded engine returned from polling task for records
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:828 - Received no records from the task
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:787 - Embedded engine is polling task for records on thread Thread[debezium-engine,5,Flink Task Threads]
2025-02-13 16:20:12,842 DEBUG ChangeEventQueue:229 - polling records...
2025-02-13 16:20:12,842 DEBUG EmbeddedEngine:789 - Embedded engine returned from polling task for records
2025-02-13 16:20:12,842 DEBUG EmbeddedEngine:810 - Received 85 records from the task
2025-02-13 16:20:12,843 DEBUG EmbeddedEngine:818 - Received 85 transformed records from the task
2025-02-13 16:20:12,844 DEBUG EmbeddedEngine:787 - Embedded engine is polling task for records on thread Thread[debezium-engine,5,Flink Task Threads]
2025-02-13 16:20:12,844 INFO  DebeziumChangeFetcher:149 - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
2025-02-13 16:20:12,844 DEBUG ChangeEventQueue:229 - polling records...
2025-02-13 16:20:12,845 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:12,865 DEBUG DebeziumChangeFetcher:232 - Snapshot phase finishes.
2025-02-13 16:20:12,866 INFO  DebeziumChangeFetcher:156 - Received record from streaming binlog phase, released checkpoint lock.
2025-02-13 16:20:12,869 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,871 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:504 - Sending back 85 results
2025-02-13 16:20:12,990 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,991 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:12,991 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,991 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,992 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,095 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,096 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,164 DEBUG SqlServerStreamingChangeEventSource:145 - No change in the database
2025-02-13 16:20:13,200 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,201 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,201 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,202 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,202 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,304 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,307 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,341 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:13,341 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:13,412 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,413 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,518 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,519 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:504 - Sending back 0 results
2 Answers

你这个日志打太多了,可以关闭DEBUG的级别的,来一个INFO的看看,另外看看fe.log里面有什么信息

增加以下配置:
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label