flink-doris-connector-1.17-1.5 cdctools同步mysql到doris DorisWriter异常关闭

Viewed 50

flink版本:1.17.1
doris版本:2.0.12
flink-doris-connector版本:1.5
flink-cdc版本:2.4.2
启动方式:docker-compose
对flink-doris-connector源码做了二次开发,仅修改增加了同步多库的部分代码,即对源码中DatabaseSync.tableMapping做了扩充,以及sink时的表对应关系,其余核心功能未做改动,测试环境测试也正常,线上环境测试时,第一次测试出现ck异常,之后DorisWriter关闭,taskmanager挂掉,程序退出,参数sink.buffer-count=6,sink.buffer-size=3MB,日志信息如下:

{"log":"2024-10-30 12:57:55,639 INFO  org.apache.doris.flink.sink.writer.RecordBuffer              [] - init RecordBuffer capacity 3145728, count 6\n","stream":"stdout","time":"2024-10-30T12:57:55.639394524Z"}
{"log":"2024-10-30 12:57:55,642 INFO  org.apache.doris.flink.sink.writer.RecordBuffer              [] - start buffer data, read queue size 0, write queue size 6\n","stream":"stdout","time":"2024-10-30T12:57:55.642248002Z"}
{"log":"2024-10-30 12:57:55,642 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_devided_targeting_reports stream load started for amz_2024103019_test_ods_amz_ops_v2_ae_devided_targeting_reports_1_3_0c073640-290a-433b-beb2-b226e36c2750 on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-30T12:57:55.642318048Z"}
{"log":"2024-10-30 12:57:55,642 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_devided_targeting_reports start execute load\n","stream":"stdout","time":"2024-10-30T12:57:55.642773157Z"}
{"log":"2024-10-30 12:57:55,649 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: MySQL Source -\u003e Process -\u003e (amz_ops_v2_ae_ad_group_negative_keywords: Writer -\u003e amz_ops_v2_ae_ad_group_negative_keywords: Committer, amz_ops_v2_ae_ad_group_reports: Writer -\u003e amz_ops_v2_ae_ad_group_reports: Committer, 
amz_ops_v2_ae_ad_groups: Writer -\u003e amz_ops_v2_ae_ad_groups: Committer, amz_ops_v2_ae_ad_module_rules: Writer -\u003e amz_ops_v2_ae_ad_module_rules: Committer, amz_ops_v2_ae_asin_products: Writer -\u003e amz_ops_v2_ae_asin_products: Committer, amz_ops_v2_ae_campaign_negative_keywords: Writer -\u003e amz_ops_v2_ae_campaign_negative_keywords: Committer, amz_ops_v2_ae_campaign_placement_reports: Writer -\u003e amz_ops_v2_ae_campaign_placement_reports: Committer, amz_ops_v2_ae_campaign_reports: Writer -\u003e amz_ops_v2_ae_campaign_reports: Committer, amz_ops_v2_ae_campaigns: Writer -\u003e amz_ops_v2_ae_campaigns: Committer, amz_ops_v2_ae_campaigns_us: Writer -\u003e amz_ops_v2_ae_campaigns_us: Committer, amz_ops_v2_ae_competitor_brand_keywords: Writer -\u003e amz_ops_v2_ae_competitor_brand_keywords: Committer) (2/8)#0 - asynchronous part of checkpoint 2 could not be completed.\n","stream":"stdout","time":"2024-10-30T12:57:55.649955402Z"}
{"log":"java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.\n","stream":"stdout","time":"2024-10-30T12:57:55.650085204Z"}
{"log":"\u0009at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-30T12:57:55.65008817Z"}
{"log":"\u0009at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-30T12:57:55.650090786Z"}
{"log":"\u0009at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.\u003cinit\u003e(OperatorSnapshotFinalizer.java:69) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650093314Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650096088Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.65009891Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]\n","stream":"stdout","time":"2024-10-30T12:57:55.650101597Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]\n","stream":"stdout","time":"2024-10-30T12:57:55.650104281Z"}
{"log":"\u0009at java.lang.Thread.run(Unknown Source) [?:?]\n","stream":"stdout","time":"2024-10-30T12:57:55.650106743Z"}
{"log":"Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.\n","stream":"stdout","time":"2024-10-30T12:57:55.650109173Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:298) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650111973Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.failAndClearWriter(ChannelStateWriteRequestDispatcherImpl.java:212) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650114619Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointAbortRequest(ChannelStateWriteRequestDispatcherImpl.java:189) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650117338Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:129) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.65012011Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:94) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650125232Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:161) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650127881Z"}
{"log":"\u0009at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:116) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650130537Z"}
{"log":"\u0009... 1 more\n","stream":"stdout","time":"2024-10-30T12:57:55.65013311Z"}
{"log":"Caused by: java.util.concurrent.CancellationException: checkpoint aborted via notification\n","stream":"stdout","time":"2024-10-30T12:57:55.650135551Z"}


{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:455) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650138264Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:409) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650140981Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$17(StreamTask.java:1387) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650144126Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$19(StreamTask.java:1410) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650146707Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650149232Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650151767Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650154264Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650156936Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650159571Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650162229Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650164981Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650167463Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650169925Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650172392Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650179329Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-30T12:57:55.650182173Z"}
{"log":"\u0009... 1 more\n","stream":"stdout","time":"2024-10-30T12:57:55.650188104Z"}
.......
{"log":"2024-10-30 12:58:01,035 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Stopped reading binlog after 0 events, last recorded offset: {transaction_id=null, file=mysql_bin.009167, pos=612662535, server_id=2, event=1}\n","stream":"stdout","time":"2024-10-30T12:58:01.036052558Z"}
{"log":"2024-10-30 12:58:01,808 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed\n","stream":"stdout","time":"2024-10-30T12:58:01.808537311Z"}
{"log":"2024-10-30 12:58:01,808 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [amz_ops_v2_au.ad_group_reports:13]\n","stream":"stdout","time":"2024-10-30T12:58:01.808771146Z"}
{"log":"2024-10-30 12:58:05,756 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.756472829Z"}
{"log":"2024-10-30 12:58:05,756 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.75661164Z"}
{"log":"2024-10-30 12:58:05,756 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.756668824Z"}
{"log":"2024-10-30 12:58:05,756 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.756757666Z"}
{"log":"2024-10-30 12:58:05,756 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.757006111Z"}
{"log":"2024-10-30 12:58:05,757 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.757626254Z"}
{"log":"2024-10-30 12:58:05,757 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.757714242Z"}
{"log":"2024-10-30 12:58:05,757 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.757894758Z"}
{"log":"2024-10-30 12:58:05,758 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.758327378Z"}
{"log":"2024-10-30 12:58:05,758 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.758503083Z"}
{"log":"2024-10-30 12:58:05,759 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.759159617Z"}
{"log":"2024-10-30 12:58:05,759 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.759358386Z"}
{"log":"2024-10-30 12:58:05,759 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-30T12:58:05.759548411Z"}

第二次增加了参数sink.use-cache=true,运行不到一小时DorisWriter依然关闭,日志如下:

{"log":"2024-10-31 10:50:52,108 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - getting exception, breakpoint resume for checkpoint ID: 2, table test_ods.amz_ops_v2_ae_targeting_reports\n","stream":"stdout","time":"2024-10-31T10:50:52.108619661Z"}
{"log":"2024-10-31 10:50:52,108 INFO  org.apache.doris.flink.sink.writer.CacheRecordBuffer         [] - start buffer data, read queue size 1, write queue size 3, buffer cache size 2, buffer pool size 0\n","stream":"stdout","time":"2024-10-31T10:50:52.108741437Z"}
{"log":"2024-10-31 10:50:52,108 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load started for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_1_2_cf439bb6-87f4-43f2-a1ba-734f0e94863d on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:52.108799336Z"}
{"log":"2024-10-31 10:50:52,110 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports start execute load\n","stream":"stdout","time":"2024-10-31T10:50:52.110688838Z"}
{"log":"2024-10-31 10:50:53,707 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - getting exception, breakpoint resume for checkpoint ID: 2, table test_ods.amz_ops_v2_ae_targeting_reports\n","stream":"stdout","time":"2024-10-31T10:50:53.707381075Z"}
{"log":"2024-10-31 10:50:53,707 INFO  org.apache.doris.flink.sink.writer.CacheRecordBuffer         [] - start buffer data, read queue size 1, write queue size 3, buffer cache size 2, buffer pool size 0\n","stream":"stdout","time":"2024-10-31T10:50:53.707468559Z"}
{"log":"2024-10-31 10:50:53,707 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load started for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_3_2_c8ce0e39-5d22-461d-92bb-f4444b7c549b on host xxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:53.707519378Z"}
{"log":"2024-10-31 10:50:53,709 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports start execute load\n","stream":"stdout","time":"2024-10-31T10:50:53.709569281Z"}
{"log":"2024-10-31 10:50:55,246 INFO  com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Source reader 4 adds split MySqlSnapshotSplit{tableId=amz_ops_v2_ae.targeting_reports, splitId='amz_ops_v2_ae.targeting_reports:10', splitKeyType=[`target_id` BIGINT NOT NULL], splitStart=[144259984214993964], splitEnd=null, highWatermark=null}\n","stream":"stdout","time":"2024-10-31T10:50:55.247009642Z"}
{"log":"2024-10-31 10:50:55,247 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [MySqlSnapshotSplit{tableId=amz_ops_v2_ae.targeting_reports, splitId='amz_ops_v2_ae.targeting_reports:10', splitKeyType=[`target_id` BIGINT NOT NULL], splitStart=[144259984214993964], splitEnd=null, highWatermark=null}]\n","stream":"stdout","time":"2024-10-31T10:50:55.247076653Z"}
{"log":"2024-10-31 10:50:55,247 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 7\n","stream":"stdout","time":"2024-10-31T10:50:55.247704035Z"}
{"log":"2024-10-31 10:50:55,247 INFO  com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader [] - Handling split change SplitAddition:[[MySqlSnapshotSplit{tableId=amz_ops_v2_ae.targeting_reports, splitId='amz_ops_v2_ae.targeting_reports:10', splitKeyType=[`target_id` BIGINT NOT NULL], splitStart=[144259984214993964], splitEnd=null, highWatermark=null}]]\n","stream":"stdout","time":"2024-10-31T10:50:55.247773497Z"}
{"log":"2024-10-31 10:50:55,247 WARN  io.debezium.connector.mysql.MySqlConnection                  [] - Database configuration option 'serverTimezone' is set but is obsolete, please use 'connectionTimeZone' instead\n","stream":"stdout","time":"2024-10-31T10:50:55.247912108Z"}
{"log":"2024-10-31 10:50:55,258 INFO  com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Starting offset is initialized to {ts_sec=0, file=, pos=0, kind=EARLIEST, row=0, event=0}\n","stream":"stdout","time":"2024-10-31T10:50:55.258731476Z"}
{"log":"2024-10-31 10:50:55,266 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql_bin.009213, pos=460528841, kind=SPECIFIC, gtids=, row=0, event=0} for split MySqlSnapshotSplit{tableId=amz_ops_v2_ae.targeting_reports, splitId='amz_ops_v2_ae.targeting_reports:10', splitKeyType=[`target_id` BIGINT NOT NULL], splitStart=[144259984214993964], splitEnd=null, highWatermark=null}\n","stream":"stdout","time":"2024-10-31T10:50:55.266105836Z"}
{"log":"2024-10-31 10:50:55,266 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Snapshot step 2 - Snapshotting data\n","stream":"stdout","time":"2024-10-31T10:50:55.266120759Z"}
{"log":"2024-10-31 10:50:55,266 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exporting data from split 'amz_ops_v2_ae.targeting_reports:10' of table amz_ops_v2_ae.targeting_reports\n","stream":"stdout","time":"2024-10-31T10:50:55.266154261Z"}
{"log":"2024-10-31 10:50:55,266 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - For split 'amz_ops_v2_ae.targeting_reports:10' of table amz_ops_v2_ae.targeting_reports using select statement: 'SELECT * FROM `amz_ops_v2_ae`.`targeting_reports` WHERE `target_id` \u003e= ?'\n","stream":"stdout","time":"2024-10-31T10:50:55.266198557Z"}
{"log":"2024-10-31 10:50:55,448 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Finished exporting 9944 records for split 'amz_ops_v2_ae.targeting_reports:10', total duration '00:00:00.182'\n","stream":"stdout","time":"2024-10-31T10:50:55.4483928Z"}
{"log":"2024-10-31 10:50:55,448 INFO  com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql_bin.009213, pos=460528841, kind=SPECIFIC, gtids=, row=0, event=0} for split MySqlSnapshotSplit{tableId=amz_ops_v2_ae.targeting_reports, splitId='amz_ops_v2_ae.targeting_reports:10', splitKeyType=[`target_id` BIGINT NOT NULL], splitStart=[144259984214993964], splitEnd=null, highWatermark=null}\n","stream":"stdout","time":"2024-10-31T10:50:55.448982333Z"}
{"log":"2024-10-31 10:50:55,486 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_0_2_94435123-29ed-4a1a-ac6d-bb642b0db32a on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.486632551Z"}
{"log":"2024-10-31 10:50:55,489 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_4_2_9ef5b30a-3eb9-4790-9df1-2e7199a96a4f on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.48952015Z"}
{"log":"2024-10-31 10:50:55,489 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_3_2_db544e70-a53f-40da-8f55-c4ef73488740 on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.489541639Z"}
{"log":"2024-10-31 10:50:55,487 INFO  org.apache.http.impl.execchain.RetryExec                     [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.489639935Z"}
{"log":"2024-10-31 10:50:55,489 INFO  org.apache.http.impl.execchain.RetryExec                     [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.489755377Z"}
{"log":"2024-10-31 10:50:55,486 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_2_2_0ed8d0d2-c748-4fc4-af2d-543f1d2177ee on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.489814123Z"}
{"log":"2024-10-31 10:50:55,486 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_1_2_892b1679-3443-4826-baff-c7db36158ea6 on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.489869016Z"}
{"log":"2024-10-31 10:50:55,489 INFO  org.apache.http.impl.execchain.RetryExec                     [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.49002944Z"}
{"log":"2024-10-31 10:50:55,491 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_6_2_eba7336c-43f6-49ed-a6bb-c31e56b2304a on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.491391869Z"}
{"log":"2024-10-31 10:50:55,491 INFO  org.apache.http.impl.execchain.RetryExec                     [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.49162895Z"}
{"log":"2024-10-31 10:50:55,489 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_7_2_b71b566a-7960-40cd-9b9e-8120f88daef1 on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.499747865Z"}
{"log":"2024-10-31 10:50:55,496 INFO  org.apache.http.impl.execchain.RetryExec                     [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.507345791Z"}
{"log":"2024-10-31 10:50:55,496 INFO  org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table amz_ops_v2_ae_targeting_reports stream load stopped for amz_202410311818_test_ods_amz_ops_v2_ae_targeting_reports_5_2_cc742c27-9a3d-485e-ae32-f5391014e522 on host xxxxxxx:8040\n","stream":"stdout","time":"2024-10-31T10:50:55.515161753Z"}
{"log":"2024-10-31 10:50:55,674 INFO  org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - discarding 2 drained requests\n","stream":"stdout","time":"2024-10-31T10:50:55.674976108Z"}
{"log":"2024-10-31 10:50:55,675 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.675511666Z"}
{"log":"2024-10-31 10:50:55,674 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.675645214Z"}
{"log":"2024-10-31 10:50:55,675 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.675741345Z"}
{"log":"2024-10-31 10:50:55,675 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.675823842Z"}
{"log":"2024-10-31 10:50:55,674 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.676688059Z"}
{"log":"2024-10-31 10:50:55,676 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.676805169Z"}
{"log":"2024-10-31 10:50:55,675 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.676816665Z"}
{"log":"2024-10-31 10:50:55,676 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.67694322Z"}
{"log":"2024-10-31 10:50:55,677 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.677112299Z"}
{"log":"2024-10-31 10:50:55,677 INFO  org.apache.doris.flink.sink.writer.DorisWriter               [] - Close DorisWriter.\n","stream":"stdout","time":"2024-10-31T10:50:55.677179518Z"}
......
{"log":"2024-10-31 10:50:55,820 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MySQL Source -\u003e Process -\u003e (amz_ops_v2_ae_ad_group_negative_keywords: Writer -\u003e amz_ops_v2_ae_ad_group_negative_keywords: Committer, amz_ops_v2_ae_ad_group_reports: Writer -\u003e amz_ops_v2_ae_ad_group_reports: Committer, amz_ops_v2_ae_ad_groups: Writer -\u003e amz_ops_v2_ae_ad_groups: Committer, amz_ops_v2_ae_ad_module_rules: Writer -\u003e amz_ops_v2_ae_ad_module_rules: Committer, amz_ops_v2_ae_asin_products: Writer -\u003e amz_ops_v2_ae_asin_products: Committer, amz_ops_v2_ae_campaign_negative_keywords: Writer -\u003e amz_ops_v2_ae_campaign_negative_keywords: Committer, amz_ops_v2_ae_campaign_placement_reports: Writer -\u003e amz_ops_v2_ae_campaign_placement_reports: Committer, amz_ops_v2_ae_campaign_reports: Writer -\u003e amz_ops_v2_ae_campaign_reports: Committer, amz_ops_v2_ae_campaigns: Writer -\u003e amz_ops_v2_ae_campaigns: Committer) (7/8)#0 (666184ecbdc2649378bbafbe0868a824_cbc357ccb763df2852fee8c4fc7d55f2_6_0) switched from RUNNING to FAILED with failure cause:\n","stream":"stdout","time":"2024-10-31T10:50:56.016944788Z"}
{"log":"java.lang.Exception: Could not perform checkpoint 2 for operator Source: MySQL Source -\u003e Process -\u003e (amz_ops_v2_ae_ad_group_negative_keywords: Writer -\u003e amz_ops_v2_ae_ad_group_negative_keywords: Committer, amz_ops_v2_ae_ad_group_reports: Writer -\u003e amz_ops_v2_ae_ad_group_reports: Committer, amz_ops_v2_ae_ad_groups: Writer -\u003e amz_ops_v2_ae_ad_groups: Committer, amz_ops_v2_ae_ad_module_rules: Writer -\u003e amz_ops_v2_ae_ad_module_rules: Committer, amz_ops_v2_ae_asin_products: Writer -\u003e amz_ops_v2_ae_asin_products: Committer, amz_ops_v2_ae_campaign_negative_keywords: Writer -\u003e amz_ops_v2_ae_campaign_negative_keywords: Committer, amz_ops_v2_ae_campaign_placement_reports: Writer -\u003e amz_ops_v2_ae_campaign_placement_reports: Committer, amz_ops_v2_ae_campaign_reports: Writer -\u003e amz_ops_v2_ae_campaign_reports: Committer, amz_ops_v2_ae_campaigns: Writer -\u003e amz_ops_v2_ae_campaigns: Committer, amz_ops_v2_ae_campaigns_us: Writer -\u003e amz_ops_v2_ae_campaigns_us: Committer) (7/8)#0.\n","stream":"stdout","time":"2024-10-31T10:50:56.017376603Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1184) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.0176706Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$13(StreamTask.java:1131) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.01767567Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017680043Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017683974Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017687808Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017692361Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017696489Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017700037Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017703845Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017707598Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017711201Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017715939Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017720219Z"}
{"log":"\u0009at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017724023Z"}
{"log":"\u0009at java.lang.Thread.run(Unknown Source) [?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.01772765Z"}
{"log":"Caused by: org.apache.doris.flink.exception.DorisRuntimeException: java.util.concurrent.ExecutionException: org.apache.http.client.ClientProtocolException\n","stream":"stdout","time":"2024-10-31T10:50:56.017730806Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:227) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017734397Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:202) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017738182Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017741699Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017748792Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017752515Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.01775638Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017760215Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017763851Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.01776736Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1172) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017770672Z"}
{"log":"\u0009... 14 more\n","stream":"stdout","time":"2024-10-31T10:50:56.017774198Z"}
{"log":"Caused by: java.util.concurrent.ExecutionException: org.apache.http.client.ClientProtocolException\n","stream":"stdout","time":"2024-10-31T10:50:56.017777461Z"}
{"log":"\u0009at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017781242Z"}
{"log":"\u0009at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017784546Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:225) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017788004Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:202) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017791572Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017795283Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017799149Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.0178027Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017806413Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017810213Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017813743Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017817601Z"}
{"log":"\u0009at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1172) ~[flink-dist-1.17.1.jar:1.17.1]\n","stream":"stdout","time":"2024-10-31T10:50:56.017825185Z"}
{"log":"\u0009... 14 more\n","stream":"stdout","time":"2024-10-31T10:50:56.017829035Z"}
{"log":"Caused by: org.apache.http.client.ClientProtocolException\n","stream":"stdout","time":"2024-10-31T10:50:56.017832349Z"}
{"log":"\u0009at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:187) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017835853Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017839357Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017842949Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisStreamLoad.lambda$startLoad$0(DorisStreamLoad.java:255) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017846881Z"}
{"log":"\u0009at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017850906Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017854569Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017858301Z"}
{"log":"\u0009... 1 more\n","stream":"stdout","time":"2024-10-31T10:50:56.017861726Z"}
{"log":"Caused by: org.apache.http.client.NonRepeatableRequestException: Cannot retry request with a non-repeatable request entity\n","stream":"stdout","time":"2024-10-31T10:50:56.017865162Z"}
{"log":"\u0009at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:108) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017869318Z"}
{"log":"\u0009at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017872841Z"}
{"log":"\u0009at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017876293Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017880256Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017884404Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisStreamLoad.lambda$startLoad$0(DorisStreamLoad.java:255) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017888867Z"}
{"log":"\u0009at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017892762Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017896176Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017899887Z"}
{"log":"\u0009... 1 more\n","stream":"stdout","time":"2024-10-31T10:50:56.017903591Z"}
{"log":"Caused by: java.net.SocketException: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:56.017910762Z"}
{"log":"\u0009at java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017914193Z"}
{"log":"\u0009at java.net.SocketOutputStream.socketWrite(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017917591Z"}
{"log":"\u0009at java.net.SocketOutputStream.write(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.017921048Z"}
{"log":"\u0009at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017924436Z"}
{"log":"\u0009at org.apache.http.impl.io.SessionOutputBufferImpl.flushBuffer(SessionOutputBufferImpl.java:136) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017928232Z"}
{"log":"\u0009at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:167) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017931757Z"}
{"log":"\u0009at org.apache.http.impl.io.ChunkedOutputStream.flushCacheWithAppend(ChunkedOutputStream.java:122) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017935464Z"}
{"log":"\u0009at org.apache.http.impl.io.ChunkedOutputStream.write(ChunkedOutputStream.java:179) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017939028Z"}
{"log":"\u0009at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:134) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.0179429Z"}
{"log":"\u0009at org.apache.http.impl.execchain.RequestEntityProxy.writeTo(RequestEntityProxy.java:121) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017946871Z"}
{"log":"\u0009at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017950817Z"}
{"log":"\u0009at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:152) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017954542Z"}
{"log":"\u0009at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.01795876Z"}
{"log":"\u0009at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017962639Z"}
{"log":"\u0009at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017966543Z"}
{"log":"\u0009at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017970226Z"}
{"log":"\u0009at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017973837Z"}
{"log":"\u0009at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017977334Z"}
{"log":"\u0009at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017981045Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017988965Z"}
{"log":"\u0009at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017992693Z"}
{"log":"\u0009at org.apache.doris.flink.sink.writer.DorisStreamLoad.lambda$startLoad$0(DorisStreamLoad.java:255) ~[flink-doris-connector-1.18-1.6.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]\n","stream":"stdout","time":"2024-10-31T10:50:56.017996225Z"}
{"log":"\u0009at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.018000757Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.018004393Z"}
{"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]\n","stream":"stdout","time":"2024-10-31T10:50:56.018007877Z"}
{"log":"\u0009... 1 more\n","stream":"stdout","time":"2024-10-31T10:50:56.018011711Z"}
1 Answers

{"log":"2024-10-31 10:50:55,489 INFO org.apache.http.impl.execchain.RetryExec [] - I/O exception (java.net.SocketException) caught when processing request to {}-\u003ehttp://xxxxxxx:8040: Broken pipe (Write failed)\n","stream":"stdout","time":"2024-10-31T10:50:55.489755377Z"}

可以试着调整这个参数
image.png