Hi, What's the Flink version that you're using? Could you also separate the two steps (switching from the old Kafka interfaces to the new ones + modifying serializers) to determine which of the two steps cause the problem?
Best regards, Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <oscar.chen....@gmail.com> wrote: > Hi Huweihua, > > Thanks for the reply. Yes, we increased memory first. > But we are still curious about the memory increasing with the new Kafka > APIs/Serilizers. > > > On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote: > >> Hi, >> >> You can try to increase the memory of TaskManager. >> If there is persistent OOM, you can dump the memory and check which part >> is taking up memory. >> >> >> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道: >> >> Hi all, >> >> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at >> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and >> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. >> Our Kafka settings are not changed*[4]*. >> >> The services are very stable before migration. However, we get OOM errors >> *[5]* after the APIs migration. >> >> Does anyone encounter the same issue? Or anyone can give us suggestions >> about the settings? >> >> Many Thanks! >> >> [1] Kafka | Apache Flink >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction> >> [2] new Kafka APIs >> ``` >> >> def getKafkaSource[T: TypeInformation](config: Config, >> topic: String, >> parallelism: Int, >> uid: String, >> env: StreamExecutionEnvironment, >> deserializer: >> DeserializationSchema[T]): DataStream[T] = { >> val properties = getKafkaCommonProperties(config) >> >> properties.put(ConsumerConfig.GROUP_ID_CONFIG, >> config.getString("kafka.group.id")) >> properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, >> config.getString("kafka.session.timeout.ms")) >> properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, >> config.getString("kafka.receive.buffer.bytes")) >> >> >> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, >> "3600000") >> >> val source = KafkaSource.builder[T]() >> .setProperties(properties) >> .setTopics(topic) >> .setValueOnlyDeserializer(deserializer) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .build() >> >> env >> .fromSource(source, WatermarkStrategy.noWatermarks[T], uid) >> .uid(uid) >> .setParallelism(math.min(parallelism, env.getParallelism)) >> .setMaxParallelism(parallelism) >> } >> >> def getKafkaSink[T: TypeInformation](config: Config, >> serializer: >> KafkaRecordSerializationSchema[T]): KafkaSink[T] = { >> val properties = getKafkaCommonProperties(config) >> >> properties.put(ProducerConfig.LINGER_MS_CONFIG, >> config.getString("kafka.linger.ms")) >> properties.put(ProducerConfig.BATCH_SIZE_CONFIG, >> config.getString("kafka.batch.size")) >> properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, >> config.getString("kafka.compression.type")) >> >> KafkaSink.builder[T]() >> .setKafkaProducerConfig(properties) >> .setBootstrapServers(config.getString("kafka.bootstrap.servers")) >> .setRecordSerializer(serializer) >> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) >> .build() >> } >> >> ``` >> [3] New Serializer >> >> import java.lang >> import java.nio.charset.StandardCharsets >> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema >> import org.apache.kafka.clients.producer.ProducerRecord >> import com.appier.rt.short_term_score.model.UserSTState >> >> class UserSTStateSerializer(topic: String) extends >> KafkaRecordSerializationSchema[UserSTState] { >> override def serialize(element: UserSTState, context: >> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): >> ProducerRecord[Array[Byte], Array[Byte]] = { >> new ProducerRecord(topic, >> element.toString.getBytes(StandardCharsets.UTF_8)) >> } >> } >> >> [4] Kafka Settings >> >> # Common >> retries = "15" >> retry.backoff.ms = "500" >> reconnect.backoff.ms = "1000" >> >> # Producer >> linger.ms = "5" >> batch.size = "1048576" >> compression.type = "gzip" >> >> # Consumer >> group.id = "<censored>" >> session.timeout.ms = "100000" >> receive.buffer.bytes = "8388608" >> >> [5] *Error Message* >> ``` >> java.lang.OutOfMemoryError >> >> at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) >> at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) >> at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown >> Source) >> at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) >> at >> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown >> Source) >> at >> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown >> Source) >> at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) >> at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) >> at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) >> at >> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55) >> at >> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302) >> at >> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217) >> at >> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138) >> at com.sun.proxy.$Proxy64.submitTask(Unknown Source) >> at >> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60) >> at >> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589) >> at >> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown >> Source) >> at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) >> at >> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown >> Source) >> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> Source) >> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >> Source) >> at java.base/java.lang.Thread.run(Unknown Source) >> Suppressed: java.lang.OutOfMemoryError >> at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown >> Source) >> at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) >> at >> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) >> at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) >> at >> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown >> Source) >> at >> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown >> Source) >> at java.base/java.io.ObjectOutputStream.flush(Unknown Source) >> at java.base/java.io.ObjectOutputStream.close(Unknown Source) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635) >> ... 15 more >> >> ``` >> >> -- >> *Regards,* >> *Oscar / Chen Hua Wei* >> >> >>