Skip to content

[PipelineOutputEmitter] kryo.serialize java.lang.OutOfMemoryError #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
huhao0926 opened this issue Apr 11, 2025 · 7 comments
Open

[PipelineOutputEmitter] kryo.serialize java.lang.OutOfMemoryError #498

huhao0926 opened this issue Apr 11, 2025 · 7 comments

Comments

@huhao0926
Copy link

huhao0926 commented Apr 11, 2025

Describe the bug
LocalMode,consume kafka stream, exit with OutOfMemoryError
2025-04-10 14:16:44,143 [shuffle-writer-14-Message] ERROR ComponentUncaughtExceptionHandler:30 - FATAL exception in thread: shuffle-writer-14-Message
com.antgroup.geaflow.common.exception.GeaflowRuntimeException: java.lang.OutOfMemoryError
at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:259)
at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:177)
at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:169)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:172)
at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:171)
at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163)
at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:334)
at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:330)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.antgroup.geaflow.shuffle.serialize.RecordSerializer.doSerialize(RecordSerializer.java:39)
at com.antgroup.geaflow.shuffle.serialize.AbstractRecordSerializer.serialize(AbstractRecordSerializer.java:23)
at com.antgroup.geaflow.shuffle.api.writer.ShardWriter.emit(ShardWriter.java:135)
at com.antgroup.geaflow.shuffle.api.writer.PipelineShardWriter.emit(PipelineShardWriter.java:74)
at com.antgroup.geaflow.shuffle.api.writer.PipelineWriter.emit(PipelineWriter.java:49)
at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.execute(PipelineOutputEmitter.java:217)
at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:196)
... 3 more

@qingwen220
Copy link
Contributor

qingwen220 commented Apr 15, 2025

@huhao0926, which version or branch code do you use?

I'm not sure if this issue happens in a batch or stream job. The issue occurs when the output bucket buffer is more than 2GB.

@huhao0926
Copy link
Author

@qingwen220

stream job

the related code is here:
public void emit(long windowId, List data, int channel) throws IOException {
BufferBuilder outBuffer = this.buffers[channel];
for (T datum : data) {
this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory
}
if (outBuffer.getBufferSize() >= this.maxBufferSize) {
this.sendBuffer(channel, outBuffer, windowId);
}
}

@qingwen220
Copy link
Contributor

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。
这里能写到2GB,有种情况是, 传过来的 ListData 太大了。 你看看source function里,
是不是一个batch 写太多数据了。

@huhao0926
Copy link
Author

huhao0926 commented Apr 16, 2025

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的;
传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。
目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢
for (T datum : data) {
this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory
if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里
this.sendBuffer(channel, outBuffer, windowId);
}
}
}
@qingwen220

@qingwen220
Copy link
Contributor

qingwen220 commented Apr 18, 2025

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId); } } } @qingwen220

没理解你说的本地改动是什么, 你贴出来的这段代码是 ShardWriter 里的,outBuffer.getBufferSize() >= this.maxBufferSize 这个判断就是超过阈值清空buffer, maxBufferSize默认配置128MB,你用的 Kafka Source是 DSL 自带的吗,如果是,kafka source的配置里有两个参数控制 一批次数据量:
geaflow.dsl.time.window.size (基于时间窗口,单位为秒,默认是-1,表示所有时间)和 geaflow.dsl.window.size (基于条数窗口,默认是1) 。 如果配置了 geaflow.dsl.start.time 表示走时间窗口模式,需要额外加上geaflow.dsl.time.window.size配置。

@huhao0926
Copy link
Author

就是我在本地代码临时修改了下这块代码,然后部署, Kafka Source是 DSL 自带的。我修改下这个配置看下

@Loognqiang
Copy link
Collaborator

Hello. Are there any other questions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants