sqlserver版本:sqlserver2019
flink版本:1.16
flink cdc 版本:2.2.1
doris connector版本:24.0.1
doris 版本:2.1.6
现象:运行作业,执行select * from ProductLine_source 能实时获取源表的全量+增量数据,但是数据无法通过sink 写入 doris,整个程序的debug 日志没有显示任何错误和调用streamload的过程
源码:
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// enable checkpoint
env.enableCheckpointing(10000);
// 创建 SQL Server CDC 表
tableEnv.executeSql(
"CREATE TABLE ProductLine_source ( " +
"ID BIGINT , " +
"Name STRING , " +
"ServiceTeamID BIGINT, " +
"QuotationMode INT , " +
"Description STRING, " +
"NeedSaleForecastProject BOOLEAN, " +
"QuotationType INT , " +
"NeedServiceSale BOOLEAN , " +
"NeedUploadFile BOOLEAN , " +
"NeedJoinSameProductInISC BOOLEAN , " +
"BusinessUnitId BIGINT, " +
"CanCreateInnerServiceType4 BOOLEAN, " +
"State INT , " +
"PRIMARY KEY (ID) NOT ENFORCED " +
") WITH ( " +
"'connector' = 'sqlserver-cdc', " + // 确认这是正确的连接器名称
"'hostname' = 'xxxxx', " +
"'port' = '1433', " +
"'username' = 'xxxx', " +
"'password' = 'xxxxx', " +
"'database-name' = 'business', " +
"'schema-name' = 'dbo', " +
"'table-name' = 'ProductLine' " +
")"
);
// 执行查询并打印源表结果
tableEnv.executeSql("SELECT * FROM ProductLine").print();
// 创建 Doris 映射表
tableEnv.executeSql(
"CREATE TABLE productLine_doris_sink ( " +
" ID BIGINT, " +
" Name STRING, " +
" ServiceTeamID BIGINT, " +
" QuotationMode INT, " +
" Description STRING, " +
" NeedSaleForecastProject BOOLEAN, " +
" QuotationType INT, " +
" NeedServiceSale BOOLEAN, " +
" NeedUploadFile BOOLEAN, " +
" NeedJoinSameProductInISC BOOLEAN, " +
" BusinessUnitId BIGINT, " +
" CanCreateInnerServiceType4 BOOLEAN, " +
" State INT " +
") WITH ( " +
" 'connector' = 'doris', " +
" 'fenodes' = 'xxxxx:8030', " +
" 'table.identifier' = 'edm.productLine', " + // 目标表所在位置
" 'username' = 'xxx', " +
" 'password' = 'xxx', " +
" 'sink.label-prefix' = 'productLine_' " +
")"
);
tableEnv.executeSql("insert into productLine_doris_sink SELECT * FROM ProductLine_source");
doris 建表语句
CREATE TABLE `productLine` (
`ID` bigint NOT NULL,
`Name` text NOT NULL,
`ServiceTeamID` bigint NULL,
`QuotationMode` int NOT NULL,
`Description` text NULL,
`NeedSaleForecastProject` boolean NULL,
`QuotationType` int NOT NULL,
`NeedServiceSale` boolean NOT NULL,
`NeedUploadFile` boolean NOT NULL,
`NeedJoinSameProductInISC` boolean NOT NULL,
`BusinessUnitId` bigint NULL,
`CanCreateInnerServiceType4` boolean NULL,
`State` int NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`ID`)
COMMENT '明细数据表'
DISTRIBUTED BY HASH(`ID`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
flink debug 日志:
2025-02-13 16:20:11,873 INFO RelationalSnapshotChangeEventSource:96 - Snapshot step 1 - Preparing
2025-02-13 16:20:11,873 INFO RelationalSnapshotChangeEventSource:105 - Snapshot step 2 - Determining captured tables
2025-02-13 16:20:11,907 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:11,908 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:11,908 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:11,909 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:11,909 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:11,954 INFO RelationalSnapshotChangeEventSource:112 - Snapshot step 3 - Locking captured tables [Business.dbo.ProductLine]
2025-02-13 16:20:11,954 INFO SqlServerSnapshotChangeEventSource:125 - Setting locking timeout to 10 s
2025-02-13 16:20:12,013 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,014 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,014 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,055 INFO SqlServerSnapshotChangeEventSource:130 - Executing schema locking
2025-02-13 16:20:12,055 INFO SqlServerSnapshotChangeEventSource:137 - Locking table Business.dbo.ProductLine
2025-02-13 16:20:12,077 INFO RelationalSnapshotChangeEventSource:118 - Snapshot step 4 - Determining snapshot offset
2025-02-13 16:20:12,095 INFO RelationalSnapshotChangeEventSource:121 - Snapshot step 5 - Reading structure of captured tables
2025-02-13 16:20:12,096 INFO SqlServerSnapshotChangeEventSource:182 - Reading structure of schema 'Business'
2025-02-13 16:20:12,117 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,119 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,119 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,224 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,226 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,226 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,330 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,332 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,332 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,342 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:12,342 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:12,351 INFO RelationalSnapshotChangeEventSource:125 - Snapshot step 6 - Persisting schema history
2025-02-13 16:20:12,351 DEBUG RelationalSnapshotChangeEventSource:260 - Capturing structure of table Business.dbo.ProductLine
2025-02-13 16:20:12,357 DEBUG SqlServerDatabaseSchema:46 - Applying schema change event SchemaChangeEvent [database=Business, schema=dbo, ddl=null, tables=[columns: {
ID decimal() identity(18, 0) NOT NULL
Name varchar(100) NOT NULL
ServiceTeamID decimal(18, 0) DEFAULT VALUE NULL
QuotationMode int(10, 0) NOT NULL
Description varchar(500) DEFAULT VALUE NULL
NeedSaleForecastProject bit(1) DEFAULT VALUE NULL
QuotationType int(10, 0) NOT NULL
NeedServiceSale bit(1) NOT NULL
NeedUploadFile bit(1) NOT NULL
NeedJoinSameProductInISC bit(1) NOT NULL
BusinessUnitId decimal(18, 0) DEFAULT VALUE NULL
CanCreateInnerServiceType4 bit(1) DEFAULT VALUE NULL
State int(10, 0) NOT NULL
}
primary key: [ID]
default charset: null
], type=CREATE]
2025-02-13 16:20:12,359 DEBUG TableSchemaBuilder:103 - Mapping table 'Business.dbo.ProductLine' to schemas under 'sqlserver_transaction_log_source.dbo.ProductLine'
。。。
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'BusinessUnitId' (BYTES) from column BusinessUnitId decimal(18, 0) DEFAULT VALUE NULL
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'CanCreateInnerServiceType4' (BOOLEAN) from column CanCreateInnerServiceType4 bit(1) DEFAULT VALUE NULL
2025-02-13 16:20:12,367 DEBUG TableSchemaBuilder:379 - - field 'State' (INT32) from column State int(10, 0) NOT NULL
2025-02-13 16:20:12,369 DEBUG TableSchemaBuilder:126 - Mapped primary key for table 'Business.dbo.ProductLine' to schema: {"name" : "sqlserver_transaction_log_source.dbo.ProductLine.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "ID", "index" : "0", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]}
2025-02-13 16:20:12,369 DEBUG TableSchemaBuilder:127 - Mapped columns for table 'Business.dbo.ProductLine' to schema: {"name" : "sqlserver_transaction_log_source.dbo.ProductLine.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "ID", "index" : "0", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}, {"name" : "Name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "ServiceTeamID", "index" : "2", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "QuotationMode", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "Description", "index" : "4", "schema" : {"type" : "STRING", "optional" : "true"}}, {"name" : "NeedSaleForecastProject", "index" : "5", "schema" : {"type" : "BOOLEAN", "optional" : "true"}}, {"name" : "QuotationType", "index" : "6", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "NeedServiceSale", "index" : "7", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "NeedUploadFile", "index" : "8", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "NeedJoinSameProductInISC", "index" : "9", "schema" : {"type" : "BOOLEAN", "optional" : "false"}}, {"name" : "BusinessUnitId", "index" : "10", "schema" : {"name" : "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "CanCreateInnerServiceType4", "index" : "11", "schema" : {"type" : "BOOLEAN", "optional" : "true"}}, {"name" : "State", "index" : "12", "schema" : {"type" : "INT32", "optional" : "false"}}]}
2025-02-13 16:20:12,406 INFO SqlServerSnapshotChangeEventSource:155 - Schema locks released.
2025-02-13 16:20:12,407 INFO RelationalSnapshotChangeEventSource:137 - Snapshot step 7 - Snapshotting data
2025-02-13 16:20:12,407 INFO RelationalSnapshotChangeEventSource:294 - Snapshotting contents of 1 tables while still in transaction
2025-02-13 16:20:12,407 DEBUG RelationalSnapshotChangeEventSource:303 - Snapshotting table Business.dbo.ProductLine
2025-02-13 16:20:12,408 INFO RelationalSnapshotChangeEventSource:328 - Exporting data from table 'Business.dbo.ProductLine' (1 of 1 tables)
2025-02-13 16:20:12,409 INFO RelationalSnapshotChangeEventSource:336 - For table 'Business.dbo.ProductLine' using select statement: 'SELECT [ProductLine].[ID],[ProductLine].[Name],[ProductLine].[ServiceTeamID],[ProductLine].[QuotationMode],[ProductLine].[Description],[ProductLine].[NeedSaleForecastProject],[ProductLine].[QuotationType],[ProductLine].[NeedServiceSale],[ProductLine].[NeedUploadFile],[ProductLine].[NeedJoinSameProductInISC],[ProductLine].[BusinessUnitId],[ProductLine].[CanCreateInnerServiceType4],[ProductLine].[State] FROM [dbo].[ProductLine]'
2025-02-13 16:20:12,411 INFO CheckpointCoordinator:845 - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1739434812399 for job ea6d7dce92ca4f42a5037f7ec02cc6ca.
2025-02-13 16:20:12,422 DEBUG TaskExecutor:977 - Trigger checkpoint 1@1739434812399 for 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2025-02-13 16:20:12,424 DEBUG StreamTask:1267 - Starting checkpoint 1 CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,425 DEBUG SubtaskCheckpointCoordinatorImpl:337 - Task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 broadcastEvent at 1739434812425, triggerTime 1739434812399, passed time 26
2025-02-13 16:20:12,428 DEBUG CollectSinkFunction:228 - Checkpoint begin with checkpointId = 1, lastCheckpointedOffset = 0, buffered results bytes = 0
2025-02-13 16:20:12,430 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,431 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,440 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,442 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,442 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,442 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,444 DEBUG SubtaskCheckpointCoordinatorImpl:738 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - finished synchronous part of checkpoint 1. Alignment duration: 0 ms, snapshot duration -1 ms, is unaligned checkpoint : false
2025-02-13 16:20:12,447 DEBUG AsyncCheckpointRunnable:112 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - started executing asynchronous part of checkpoint 1. Asynchronous start delay: 1 ms
2025-02-13 16:20:12,460 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 12 ms.
2025-02-13 16:20:12,462 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 2 ms.
2025-02-13 16:20:12,462 DEBUG SnapshotStrategyRunner:120 - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[AsyncOperations-thread-1,5,Flink Task Threads] took 0 ms.
2025-02-13 16:20:12,471 DEBUG AsyncCheckpointRunnable:248 - Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0 - finished asynchronous part of checkpoint 1. Asynchronous duration: 16 ms
2025-02-13 16:20:12,476 DEBUG CheckpointCoordinator:1149 - Received acknowledge message for checkpoint 1 from task 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0 of job ea6d7dce92ca4f42a5037f7ec02cc6ca at 7112a818-202e-4dfe-b71d-37fba4080751 @ 127.0.0.1 (dataPort=-1).
2025-02-13 16:20:12,483 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=1}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=1,Name=智成服务,ServiceTeamID=23291,QuotationMode=2,Description=智成服务,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=false,BusinessUnitId=1,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812470,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812475}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,498 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=15}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=15,Name=服务经理,ServiceTeamID=194117,QuotationMode=1,Description=智臻服务经理,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=21,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812496,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812496}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,498 DEBUG SharedStateRegistryImpl:160 - Discard state created before checkpoint 1 and not used afterwards
2025-02-13 16:20:12,499 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=16}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=16,Name=私有云服务,ServiceTeamID=29742,QuotationMode=1,Description=华讯私有云服务,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=5,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812496,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812498}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,502 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=17}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=17,Name=集成-BSM,ServiceTeamID=452163,QuotationMode=1,Description=集成-BSM,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812499,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812499}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,502 INFO CheckpointCoordinator:1339 - Completed checkpoint 1 for job ea6d7dce92ca4f42a5037f7ec02cc6ca (3954 bytes, checkpointDuration=94 ms, finalizationTime=9 ms).
2025-02-13 16:20:12,503 DEBUG CheckpointCoordinator:1357 - Checkpoint state: OperatorState(operatorID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 3280), OperatorState(operatorID: 570f707193e0fe32f4d86d067aba243b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0), OperatorState(operatorID: b728d985904d42b0fdd945a9e3253fca, parallelism: 1, maxParallelism: 128, coordinatorState: 254 bytes, sub task states: 1, total size (bytes): 674)
2025-02-13 16:20:12,503 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=18}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=18,Name=厂商安全产品集成服务,ServiceTeamID=3928414,QuotationMode=1,Description=厂商安全产品集成服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812501,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812501}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,504 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=19}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=19,Name=CloudEagle,QuotationMode=1,Description=CloudEagle,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812502,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812503}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,504 DEBUG TaskExecutor:1008 - Confirm completed checkpoint 1@1739434812399 and last subsumed checkpoint -1 for 08e815a721c7350fd7b27859387abed2_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2025-02-13 16:20:12,505 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=20}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=20,Name=云智能交付服务,ServiceTeamID=3158223,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=true,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812503,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812504}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,506 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=21}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=21,Name=协作高级服务,ServiceTeamID=29742,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=7,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812504,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812504}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,507 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=22}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=22,Name=行业软件,ServiceTeamID=1383821,QuotationMode=2,Description=,NeedSaleForecastProject=false,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=7,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812506,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812506}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,508 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=23}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=23,Name=云专家服务,ServiceTeamID=3928483,QuotationMode=1,Description=替代原:虚拟化服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=5,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812507,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812507}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,508 DEBUG StreamTask:1409 - Notify checkpoint 1 complete on task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,511 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=24}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=24,Name=公有云增值服务,ServiceTeamID=194117,QuotationMode=1,Description=公有云增值服务,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=3,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812508,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812508}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,511 DEBUG SubtaskCheckpointCoordinatorImpl:451 - Notification of checkpoint COMPLETE 1 for task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,514 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=25}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=25,Name=智成海外-实施,ServiceTeamID=194117,QuotationMode=1,Description=智成服务(海外),QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=false,BusinessUnitId=1,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812510,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812511}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,514 DEBUG CollectSinkFunction:325 - Checkpoint complete with checkpointId = 1, lastCheckpointedOffset = 0
2025-02-13 16:20:12,515 WARN DebeziumSourceFunction:465 - Consumer subtask 0 received confirmation for unknown checkpoint id 1
2025-02-13 16:20:12,515 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=26}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=26,Name=新IT智能运维软件,ServiceTeamID=29742,QuotationMode=1,Description=智能运维软件,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812513,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812513}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,516 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=27}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=27,Name=公有云运营服务,ServiceTeamID=23351,QuotationMode=1,Description=公有云运营服务,QuotationType=1,NeedServiceSale=true,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812515,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812515}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,518 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=28}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=28,Name=智能运维订阅软件,ServiceTeamID=29742,QuotationMode=1,Description=智能运维软件订阅,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812516,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812516}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,518 DEBUG SubtaskCheckpointCoordinatorImpl:451 - Notification of checkpoint SUBSUME -1 for task Source: ProductLine[1] -> ConstraintEnforcer[2] -> Sink: Collect table sink (1/1)#0
2025-02-13 16:20:12,519 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=29}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=29,Name=智能运维软件项目费用,ServiceTeamID=29742,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=0},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812517,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812518}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,547 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=77}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=77,Name=云智能运维服务,ServiceTeamID=3158223,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=14,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812546,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812546}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,547 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,547 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=78}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=78,Name=ObsEagle,QuotationMode=1,Description=,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812547,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812547}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,550 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=79}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=79,Name=AssetFlow,QuotationMode=1,Description=,QuotationType=2,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812547,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812547}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,551 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=85}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=85,Name=智安软件产品研发,ServiceTeamID=3928414,QuotationMode=1,Description=此产品线用于智安事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812550,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812550}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,551 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,551 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,552 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,552 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=86}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=86,Name=行业数智软件产品研发,ServiceTeamID=5320625,QuotationMode=1,Description=此产品线用于行业数智事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=9,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812551,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812551}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,552 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,552 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=87}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=87,Name=智能运维软件产品研发,ServiceTeamID=7502826,QuotationMode=1,Description=此产品线用于智能运维事业部产品开发的研发费用投入,QuotationType=1,NeedServiceSale=false,NeedUploadFile=true,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=true,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812552,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812552}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
。。。
2025-02-13 16:20:12,557 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=93}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=93,Name=NetGo111111111,ServiceTeamID=7502826,QuotationMode=1,Description=,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=6,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812555,snapshot=true,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812555}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,557 INFO RelationalSnapshotChangeEventSource:384 - Finished exporting 85 records for table 'Business.dbo.ProductLine'; total duration '00:00:00.149'
2025-02-13 16:20:12,557 DEBUG ChangeEventQueue:200 - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=sqlserver_transaction_log_source}, sourceOffset={commit_lsn=00016c6a:00001e6a:0069, snapshot=true, snapshot_completed=true}} ConnectRecord{topic='sqlserver_transaction_log_source.dbo.ProductLine', kafkaPartition=null, key=Struct{ID=94}, keySchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Key:STRUCT}, value=Struct{after=Struct{ID=94,Name=安全运营服务,ServiceTeamID=3928414,QuotationMode=1,Description=智安-安全运营服务SOC,QuotationType=1,NeedServiceSale=false,NeedUploadFile=false,NeedJoinSameProductInISC=true,BusinessUnitId=4,CanCreateInnerServiceType4=false,State=1},source=Struct{version=1.5.4.Final,connector=sqlserver,name=sqlserver_transaction_log_source,ts_ms=1739434812555,snapshot=last,db=Business,schema=dbo,table=ProductLine,commit_lsn=00016c6a:00001e6a:0069},op=r,ts_ms=1739434812555}, valueSchema=Schema{sqlserver_transaction_log_source.dbo.ProductLine.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2025-02-13 16:20:12,570 INFO AbstractSnapshotChangeEventSource:85 - Snapshot - Final stage
2025-02-13 16:20:12,632 INFO SqlServerSnapshotChangeEventSource:230 - Removing locking timeout
2025-02-13 16:20:12,651 INFO ChangeEventSourceCoordinator:111 - Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=sqlserver_transaction_log_source, changeLsn=NULL, commitLsn=00016c6a:00001e6a:0069, eventSerialNo=null, snapshot=FALSE, sourceTime=2025-02-13T08:20:12.555Z], partition={server=sqlserver_transaction_log_source}, snapshotCompleted=true, eventSerialNo=1]]
2025-02-13 16:20:12,657 INFO StreamingChangeEventSourceMetrics:60 - Connected metrics set to 'true'
2025-02-13 16:20:12,657 INFO ChangeEventSourceCoordinator:151 - Starting streaming
2025-02-13 16:20:12,658 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,660 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,660 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,660 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,661 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,764 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,766 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,766 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,767 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,767 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:12,798 INFO SqlServerStreamingChangeEventSource:340 - CDC is enabled for table Capture instance "dbo_SaleContractCostRecord4" [sourceTableId=Business.dbo.SaleContractCostRecord4, changeTableId=Business.cdc.dbo_SaleContractCostRecord4_CT, startLsn=00016c00:000037d4:0001, changeTableObjectId=1925686008, stopLsn=NULL] but the table is not whitelisted by connector
。。。
2025-02-13 16:20:12,798 INFO SqlServerStreamingChangeEventSource:340 - CDC is enabled for table Capture instance "dbo_orders" [sourceTableId=Business.dbo.orders, changeTableId=Business.cdc.dbo_orders_CT, startLsn=00016c65:000037da:0046, changeTableObjectId=1938210055, stopLsn=NULL] but the table is not whitelisted by connector
2025-02-13 16:20:12,814 DEBUG SqlServerStreamingChangeEventSource:145 - No change in the database
2025-02-13 16:20:12,841 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:789 - Embedded engine returned from polling task for records
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:828 - Received no records from the task
2025-02-13 16:20:12,841 DEBUG EmbeddedEngine:787 - Embedded engine is polling task for records on thread Thread[debezium-engine,5,Flink Task Threads]
2025-02-13 16:20:12,842 DEBUG ChangeEventQueue:229 - polling records...
2025-02-13 16:20:12,842 DEBUG EmbeddedEngine:789 - Embedded engine returned from polling task for records
2025-02-13 16:20:12,842 DEBUG EmbeddedEngine:810 - Received 85 records from the task
2025-02-13 16:20:12,843 DEBUG EmbeddedEngine:818 - Received 85 transformed records from the task
2025-02-13 16:20:12,844 DEBUG EmbeddedEngine:787 - Embedded engine is polling task for records on thread Thread[debezium-engine,5,Flink Task Threads]
2025-02-13 16:20:12,844 INFO DebeziumChangeFetcher:149 - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
2025-02-13 16:20:12,844 DEBUG ChangeEventQueue:229 - polling records...
2025-02-13 16:20:12,845 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:12,865 DEBUG DebeziumChangeFetcher:232 - Snapshot phase finishes.
2025-02-13 16:20:12,866 INFO DebeziumChangeFetcher:156 - Received record from streaming binlog phase, released checkpoint lock.
2025-02-13 16:20:12,869 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,871 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,871 DEBUG CollectSinkFunction:504 - Sending back 85 results
2025-02-13 16:20:12,990 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:12,991 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:12,991 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 0
2025-02-13 16:20:12,991 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:12,992 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,095 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,096 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,096 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,164 DEBUG SqlServerStreamingChangeEventSource:145 - No change in the database
2025-02-13 16:20:13,200 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,201 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,201 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,202 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,202 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,304 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,307 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,307 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,341 DEBUG ChangeEventQueue:238 - checking for more records...
2025-02-13 16:20:13,341 DEBUG ChangeEventQueue:235 - no records available yet, sleeping a bit...
2025-02-13 16:20:13,412 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,413 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,413 DEBUG CollectSinkFunction:504 - Sending back 0 results
2025-02-13 16:20:13,518 DEBUG CollectSinkOperatorCoordinator:140 - Forwarding request to sink socket server
2025-02-13 16:20:13,519 DEBUG CollectSinkOperatorCoordinator:146 - Fetching serialized result from sink socket server
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:400 - Request received, version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:405 - Expecting version = ce3c6108-2204-4405-acc9-b0c084a479ad, offset = 85
2025-02-13 16:20:13,519 DEBUG CollectSinkFunction:504 - Sending back 0 results