Hi Hua Wei,

Have you built your own Flink version? Since Flink doesn't support Scala
2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7.

> Because the new Kafka API needs the new
serializer (KafkaRecordSerializationSchema) and seems like cannot use the
old one (KafkaSerializationSchema), we cannot separate the change into two
steps.

But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
your OOM then already appears. If it does, it's most likely something else
that causes the OOM. If that runs smoothly, then you can try the migration
to KafkaConsumer and KafkaProducer to validate if then the OOM appears.

Best regards,

Martijn

On Tue, 10 May 2022 at 04:29, Hua Wei Chen <oscar.chen....@gmail.com> wrote:

> Hi Martijn,
>
> Thanks for your response.
>
> > What's the Flink version that you're using?
> Our Flink version is 1.14.4 and the scala version is 2.12.12.
>
> > Could you also separate the two steps (switching from the old Kafka
> interfaces to the new ones + modifying serializers) to determine which of
> the two steps cause the problem?
> Because the new Kafka API needs the new
> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
> old one (KafkaSerializationSchema), we cannot separate the change into two
> steps.
>
> Best Regards,
> Hua Wei
>
> On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi,
>>
>> What's the Flink version that you're using? Could you also separate the
>> two steps (switching from the old Kafka interfaces to the new ones +
>> modifying serializers) to determine which of the two steps cause the
>> problem?
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>>
>> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <oscar.chen....@gmail.com>
>> wrote:
>>
>>> Hi Huweihua,
>>>
>>> Thanks for the reply. Yes, we increased memory first.
>>> But we are still curious about the memory increasing with the new Kafka
>>> APIs/Serilizers.
>>>
>>>
>>> On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> You can try to increase the memory of TaskManager.
>>>> If there is persistent OOM, you can dump the memory and check which
>>>> part is taking up memory.
>>>>
>>>>
>>>> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道:
>>>>
>>>> Hi all,
>>>>
>>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>>>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>>>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>>>> Our Kafka settings are not changed*[4]*.
>>>>
>>>> The services are very stable before migration. However, we get OOM
>>>> errors*[5]* after the APIs migration.
>>>>
>>>> Does anyone encounter the same issue? Or anyone can give us suggestions
>>>> about the settings?
>>>>
>>>> Many Thanks!
>>>>
>>>> [1] Kafka | Apache Flink
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>>>> [2] new Kafka APIs
>>>> ```
>>>>
>>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>>                                        topic: String,
>>>>                                        parallelism: Int,
>>>>                                        uid: String,
>>>>                                        env: StreamExecutionEnvironment,
>>>>                                        deserializer: 
>>>> DeserializationSchema[T]): DataStream[T] = {
>>>>   val properties = getKafkaCommonProperties(config)
>>>>
>>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
>>>> config.getString("kafka.group.id"))
>>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
>>>> config.getString("kafka.session.timeout.ms"))
>>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
>>>> config.getString("kafka.receive.buffer.bytes"))
>>>>
>>>>   
>>>> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>>>>  "3600000")
>>>>
>>>>   val source = KafkaSource.builder[T]()
>>>>     .setProperties(properties)
>>>>     .setTopics(topic)
>>>>     .setValueOnlyDeserializer(deserializer)
>>>>     
>>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>>     .build()
>>>>
>>>>   env
>>>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>>>     .uid(uid)
>>>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>>>     .setMaxParallelism(parallelism)
>>>> }
>>>>
>>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>>                                      serializer: 
>>>> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>>   val properties = getKafkaCommonProperties(config)
>>>>
>>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
>>>> config.getString("kafka.linger.ms"))
>>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
>>>> config.getString("kafka.batch.size"))
>>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
>>>> config.getString("kafka.compression.type"))
>>>>
>>>>   KafkaSink.builder[T]()
>>>>     .setKafkaProducerConfig(properties)
>>>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>>>     .setRecordSerializer(serializer)
>>>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>>     .build()
>>>> }
>>>>
>>>> ```
>>>> [3] New Serializer
>>>>
>>>> import java.lang
>>>> import java.nio.charset.StandardCharsets
>>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>>> import org.apache.kafka.clients.producer.ProducerRecord
>>>> import com.appier.rt.short_term_score.model.UserSTState
>>>>
>>>> class UserSTStateSerializer(topic: String) extends 
>>>> KafkaRecordSerializationSchema[UserSTState] {
>>>>   override def serialize(element: UserSTState, context: 
>>>> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
>>>> ProducerRecord[Array[Byte], Array[Byte]] = {
>>>>     new ProducerRecord(topic, 
>>>> element.toString.getBytes(StandardCharsets.UTF_8))
>>>>   }
>>>> }
>>>>
>>>> [4] Kafka Settings
>>>>
>>>> # Common
>>>> retries = "15"
>>>> retry.backoff.ms = "500"
>>>> reconnect.backoff.ms = "1000"
>>>>
>>>> # Producer
>>>> linger.ms = "5"
>>>> batch.size = "1048576"
>>>> compression.type = "gzip"
>>>>
>>>> # Consumer
>>>> group.id = "<censored>"
>>>> session.timeout.ms = "100000"
>>>> receive.buffer.bytes = "8388608"
>>>>
>>>> [5] *Error Message*
>>>> ```
>>>> java.lang.OutOfMemoryError
>>>>
>>>>    at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>>    at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>    at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
>>>> Source)
>>>>    at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>    at 
>>>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
>>>> Source)
>>>>    at 
>>>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>>>>  Source)
>>>>    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>>>    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>>>    at 
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>>>    at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>>>    at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>>>    at 
>>>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>>>    at 
>>>> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>>>    at 
>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>>>> Source)
>>>>    at 
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
>>>> Source)
>>>>    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>>>    at 
>>>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>>>  Source)
>>>>    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>>>> Source)
>>>>    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>> Source)
>>>>    at java.base/java.lang.Thread.run(Unknown Source)
>>>>    Suppressed: java.lang.OutOfMemoryError
>>>>            at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown 
>>>> Source)
>>>>            at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>            at 
>>>> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>>            at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>            at 
>>>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
>>>> Source)
>>>>            at 
>>>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown 
>>>> Source)
>>>>            at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>>>            at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>>>            at 
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>>>            ... 15 more
>>>>
>>>> ```
>>>>
>>>> --
>>>> *Regards,*
>>>> *Oscar / Chen Hua Wei*
>>>>
>>>>
>>>>

Reply via email to