Hi all, Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. Our Kafka settings are not changed*[4]*.
The services are very stable before migration. However, we get OOM errors *[5]* after the APIs migration. Does anyone encounter the same issue? Or anyone can give us suggestions about the settings? Many Thanks! [1] Kafka | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction> [2] new Kafka APIs ``` def getKafkaSource[T: TypeInformation](config: Config, topic: String, parallelism: Int, uid: String, env: StreamExecutionEnvironment, deserializer: DeserializationSchema[T]): DataStream[T] = { val properties = getKafkaCommonProperties(config) properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id")) properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms")) properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes")) properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000") val source = KafkaSource.builder[T]() .setProperties(properties) .setTopics(topic) .setValueOnlyDeserializer(deserializer) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build() env .fromSource(source, WatermarkStrategy.noWatermarks[T], uid) .uid(uid) .setParallelism(math.min(parallelism, env.getParallelism)) .setMaxParallelism(parallelism) } def getKafkaSink[T: TypeInformation](config: Config, serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = { val properties = getKafkaCommonProperties(config) properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms")) properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size")) properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type")) KafkaSink.builder[T]() .setKafkaProducerConfig(properties) .setBootstrapServers(config.getString("kafka.bootstrap.servers")) .setRecordSerializer(serializer) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() } ``` [3] New Serializer import java.lang import java.nio.charset.StandardCharsets import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema import org.apache.kafka.clients.producer.ProducerRecord import com.appier.rt.short_term_score.model.UserSTState class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] { override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8)) } } [4] Kafka Settings # Common retries = "15" retry.backoff.ms = "500" reconnect.backoff.ms = "1000" # Producer linger.ms = "5" batch.size = "1048576" compression.type = "gzip" # Consumer group.id = "<censored>" session.timeout.ms = "100000" receive.buffer.bytes = "8388608" [5] *Error Message* ``` java.lang.OutOfMemoryError at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source) at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138) at com.sun.proxy.$Proxy64.submitTask(Unknown Source) at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60) at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Suppressed: java.lang.OutOfMemoryError at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source) at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source) at java.base/java.io.ObjectOutputStream.flush(Unknown Source) at java.base/java.io.ObjectOutputStream.close(Unknown Source) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635) ... 15 more ``` -- *Regards,* *Oscar / Chen Hua Wei*