Hi Martijn, Thanks for your response.
> What's the Flink version that you're using? Our Flink version is 1.14.4 and the scala version is 2.12.12. > Could you also separate the two steps (switching from the old Kafka interfaces to the new ones + modifying serializers) to determine which of the two steps cause the problem? Because the new Kafka API needs the new serializer (KafkaRecordSerializationSchema) and seems like cannot use the old one (KafkaSerializationSchema), we cannot separate the change into two steps. Best Regards, Hua Wei On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <martijnvis...@apache.org> wrote: > Hi, > > What's the Flink version that you're using? Could you also separate the > two steps (switching from the old Kafka interfaces to the new ones + > modifying serializers) to determine which of the two steps cause the > problem? > > Best regards, > > Martijn Visser > https://twitter.com/MartijnVisser82 > https://github.com/MartijnVisser > > > On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <oscar.chen....@gmail.com> > wrote: > >> Hi Huweihua, >> >> Thanks for the reply. Yes, we increased memory first. >> But we are still curious about the memory increasing with the new Kafka >> APIs/Serilizers. >> >> >> On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote: >> >>> Hi, >>> >>> You can try to increase the memory of TaskManager. >>> If there is persistent OOM, you can dump the memory and check which part >>> is taking up memory. >>> >>> >>> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道: >>> >>> Hi all, >>> >>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at >>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and >>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. >>> Our Kafka settings are not changed*[4]*. >>> >>> The services are very stable before migration. However, we get OOM errors >>> *[5]* after the APIs migration. >>> >>> Does anyone encounter the same issue? Or anyone can give us suggestions >>> about the settings? >>> >>> Many Thanks! >>> >>> [1] Kafka | Apache Flink >>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction> >>> [2] new Kafka APIs >>> ``` >>> >>> def getKafkaSource[T: TypeInformation](config: Config, >>> topic: String, >>> parallelism: Int, >>> uid: String, >>> env: StreamExecutionEnvironment, >>> deserializer: >>> DeserializationSchema[T]): DataStream[T] = { >>> val properties = getKafkaCommonProperties(config) >>> >>> properties.put(ConsumerConfig.GROUP_ID_CONFIG, >>> config.getString("kafka.group.id")) >>> properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, >>> config.getString("kafka.session.timeout.ms")) >>> properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, >>> config.getString("kafka.receive.buffer.bytes")) >>> >>> >>> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, >>> "3600000") >>> >>> val source = KafkaSource.builder[T]() >>> .setProperties(properties) >>> .setTopics(topic) >>> .setValueOnlyDeserializer(deserializer) >>> >>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >>> .build() >>> >>> env >>> .fromSource(source, WatermarkStrategy.noWatermarks[T], uid) >>> .uid(uid) >>> .setParallelism(math.min(parallelism, env.getParallelism)) >>> .setMaxParallelism(parallelism) >>> } >>> >>> def getKafkaSink[T: TypeInformation](config: Config, >>> serializer: >>> KafkaRecordSerializationSchema[T]): KafkaSink[T] = { >>> val properties = getKafkaCommonProperties(config) >>> >>> properties.put(ProducerConfig.LINGER_MS_CONFIG, >>> config.getString("kafka.linger.ms")) >>> properties.put(ProducerConfig.BATCH_SIZE_CONFIG, >>> config.getString("kafka.batch.size")) >>> properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, >>> config.getString("kafka.compression.type")) >>> >>> KafkaSink.builder[T]() >>> .setKafkaProducerConfig(properties) >>> .setBootstrapServers(config.getString("kafka.bootstrap.servers")) >>> .setRecordSerializer(serializer) >>> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) >>> .build() >>> } >>> >>> ``` >>> [3] New Serializer >>> >>> import java.lang >>> import java.nio.charset.StandardCharsets >>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema >>> import org.apache.kafka.clients.producer.ProducerRecord >>> import com.appier.rt.short_term_score.model.UserSTState >>> >>> class UserSTStateSerializer(topic: String) extends >>> KafkaRecordSerializationSchema[UserSTState] { >>> override def serialize(element: UserSTState, context: >>> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): >>> ProducerRecord[Array[Byte], Array[Byte]] = { >>> new ProducerRecord(topic, >>> element.toString.getBytes(StandardCharsets.UTF_8)) >>> } >>> } >>> >>> [4] Kafka Settings >>> >>> # Common >>> retries = "15" >>> retry.backoff.ms = "500" >>> reconnect.backoff.ms = "1000" >>> >>> # Producer >>> linger.ms = "5" >>> batch.size = "1048576" >>> compression.type = "gzip" >>> >>> # Consumer >>> group.id = "<censored>" >>> session.timeout.ms = "100000" >>> receive.buffer.bytes = "8388608" >>> >>> [5] *Error Message* >>> ``` >>> java.lang.OutOfMemoryError >>> >>> at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) >>> at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) >>> at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown >>> Source) >>> at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) >>> at >>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown >>> Source) >>> at >>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown >>> Source) >>> at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) >>> at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) >>> at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) >>> at >>> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138) >>> at com.sun.proxy.$Proxy64.submitTask(Unknown Source) >>> at >>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60) >>> at >>> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589) >>> at >>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown >>> Source) >>> at >>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown >>> Source) >>> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) >>> at >>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown >>> Source) >>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>> Source) >>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>> Source) >>> at java.base/java.lang.Thread.run(Unknown Source) >>> Suppressed: java.lang.OutOfMemoryError >>> at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown >>> Source) >>> at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) >>> at >>> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) >>> at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) >>> at >>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown >>> Source) >>> at >>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown >>> Source) >>> at java.base/java.io.ObjectOutputStream.flush(Unknown Source) >>> at java.base/java.io.ObjectOutputStream.close(Unknown Source) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635) >>> ... 15 more >>> >>> ``` >>> >>> -- >>> *Regards,* >>> *Oscar / Chen Hua Wei* >>> >>> >>>