flink-doris-connector 报错 memory was leaked by query. Memory leaked

Viewed 42

Flink 读 Doris 报错:memory was leaked by query. Memory leaked

Flink版本:1.18.1-Java17
Doris版本:1.2.5
Flink-Doris-Connector版本:1.18-1.5.2、1.18-1.6.0、1.18-1.6.1、1.18-1.6.2

查询SQL:

create temporary table xx_doris_table ( ... ) with ( 'connector' = 'doris', ... );
select * from xx_doris_table limit 1

(任意一张 doris 表都会报同样的错误)

PS:flink-java8不会出现这个错误,flink-java17才会

03ff9e1e7c34e42994f95e8c8dd7945.png

2 Answers

建议先使用flink-java8,flink-java17内存泄漏的问题可以在flink社区问问相关同学是否为已知缺陷。

遇到同样的问题,切换成jdk8问题消失,下面是我使用jdk17的报错。

16:44:28,847 INFO  org.apache.flink.runtime.taskmanager.Task                    [flink-pekko.actor.default-dispatcher-4] [] - Attempting to cancel task Source: doris_sink[1] (5/12)#2 (8ae60cbd6302353e73a079ebb63a83ec_bc764cd8ddf7a0cff126f51c16239658_4_2).
16:44:28,847 WARN  org.apache.flink.runtime.taskmanager.Task                    [Source: doris_sink[1] (6/12)#2] [] - Source: doris_sink[1] (6/12)#2 (8ae60cbd6302353e73a079ebb63a83ec_bc764cd8ddf7a0cff126f51c16239658_5_2) switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-runtime-1.20.0.jar:1.20.0]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
Caused by: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (8192)
Allocator(ROOT) 0/8192/8192/2147483647 (res/actual/peak/limit)

	at org.apache.doris.shaded.org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:477) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.shaded.org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:29) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.serialization.RowBatch.close(RowBatch.java:644) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.serialization.RowBatch.readArrow(RowBatch.java:197) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:252) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:62) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
16:44:28,847 INFO  org.apache.flink.runtime.taskmanager.Task                    [Source: doris_sink[1] (6/12)#2] [] - Freeing task resources for Source: doris_sink[1] (6/12)#2 (8ae60cbd6302353e73a079ebb63a83ec_bc764cd8ddf7a0cff126f51c16239658_5_2).
16:44:28,847 INFO  org.apache.flink.runtime.taskmanager.Task                    [flink-pekko.actor.default-dispatcher-4] [] - Task Source: doris_sink[1] (5/12)#2 is already in state FAILED
16:44:28,847 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [Source: gms_user[3] (3/12)#2] [] - Closing Source Reader.
16:44:28,847 WARN  org.apache.flink.runtime.taskmanager.Task                    [Source: doris_sink[1] (7/12)#2] [] - Source: doris_sink[1] (7/12)#2 (8ae60cbd6302353e73a079ebb63a83ec_bc764cd8ddf7a0cff126f51c16239658_6_2) switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-streaming-java-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-runtime-1.20.0.jar:1.20.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-runtime-1.20.0.jar:1.20.0]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
Caused by: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (8192)
Allocator(ROOT) 0/8192/8192/2147483647 (res/actual/peak/limit)

	at org.apache.doris.shaded.org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:477) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.shaded.org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:29) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.serialization.RowBatch.close(RowBatch.java:644) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.serialization.RowBatch.readArrow(RowBatch.java:197) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:252) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:62) ~[flink-doris-connector-1.17-24.1.0.jar:24.1.0]
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
16:44:28,848 ERROR org.apache.doris.shaded.org.apache.arrow.memory.BaseAllocator [Source Data Fetcher for Source: doris_sink[1] (8/12)#2] [] - Memory was leaked by query. Memory leaked: (8192)
Allocator(ROOT) 0/8192/8192/2147483647 (res/actual/peak/limit)

16:44:28,848 INFO  org.apache.flink.runtime.taskmanager.Task                    [Source: gms_user[3] (3/12)#2] [] - Source: gms_user[3] (3/12)#2 (8ae60cbd6302353e73a079ebb63a83ec_feca28aff5a3958840bee985ee7de4d3_2_2) switched from CANCELING to CANCELED.
16:44:28,848 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: doris_sink[1] (8/12)#2] [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.0.jar:1.20.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (8192)