Hi,

What's the Flink version that you're using? Could you also separate the two
steps (switching from the old Kafka interfaces to the new ones + modifying
serializers) to determine which of the two steps cause the problem?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <oscar.chen....@gmail.com> wrote:

> Hi Huweihua,
>
> Thanks for the reply. Yes, we increased memory first.
> But we are still curious about the memory increasing with the new Kafka
> APIs/Serilizers.
>
>
> On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote:
>
>> Hi,
>>
>> You can try to increase the memory of TaskManager.
>> If there is persistent OOM, you can dump the memory and check which part
>> is taking up memory.
>>
>>
>> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道:
>>
>> Hi all,
>>
>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>> Our Kafka settings are not changed*[4]*.
>>
>> The services are very stable before migration. However, we get OOM errors
>> *[5]* after the APIs migration.
>>
>> Does anyone encounter the same issue? Or anyone can give us suggestions
>> about the settings?
>>
>> Many Thanks!
>>
>> [1] Kafka | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>> [2] new Kafka APIs
>> ```
>>
>> def getKafkaSource[T: TypeInformation](config: Config,
>>                                        topic: String,
>>                                        parallelism: Int,
>>                                        uid: String,
>>                                        env: StreamExecutionEnvironment,
>>                                        deserializer: 
>> DeserializationSchema[T]): DataStream[T] = {
>>   val properties = getKafkaCommonProperties(config)
>>
>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
>> config.getString("kafka.group.id"))
>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
>> config.getString("kafka.session.timeout.ms"))
>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
>> config.getString("kafka.receive.buffer.bytes"))
>>
>>   
>> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>>  "3600000")
>>
>>   val source = KafkaSource.builder[T]()
>>     .setProperties(properties)
>>     .setTopics(topic)
>>     .setValueOnlyDeserializer(deserializer)
>>     
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>     .build()
>>
>>   env
>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>     .uid(uid)
>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>     .setMaxParallelism(parallelism)
>> }
>>
>> def getKafkaSink[T: TypeInformation](config: Config,
>>                                      serializer: 
>> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>   val properties = getKafkaCommonProperties(config)
>>
>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
>> config.getString("kafka.linger.ms"))
>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
>> config.getString("kafka.batch.size"))
>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
>> config.getString("kafka.compression.type"))
>>
>>   KafkaSink.builder[T]()
>>     .setKafkaProducerConfig(properties)
>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>     .setRecordSerializer(serializer)
>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>     .build()
>> }
>>
>> ```
>> [3] New Serializer
>>
>> import java.lang
>> import java.nio.charset.StandardCharsets
>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>> import org.apache.kafka.clients.producer.ProducerRecord
>> import com.appier.rt.short_term_score.model.UserSTState
>>
>> class UserSTStateSerializer(topic: String) extends 
>> KafkaRecordSerializationSchema[UserSTState] {
>>   override def serialize(element: UserSTState, context: 
>> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
>> ProducerRecord[Array[Byte], Array[Byte]] = {
>>     new ProducerRecord(topic, 
>> element.toString.getBytes(StandardCharsets.UTF_8))
>>   }
>> }
>>
>> [4] Kafka Settings
>>
>> # Common
>> retries = "15"
>> retry.backoff.ms = "500"
>> reconnect.backoff.ms = "1000"
>>
>> # Producer
>> linger.ms = "5"
>> batch.size = "1048576"
>> compression.type = "gzip"
>>
>> # Consumer
>> group.id = "<censored>"
>> session.timeout.ms = "100000"
>> receive.buffer.bytes = "8388608"
>>
>> [5] *Error Message*
>> ```
>> java.lang.OutOfMemoryError
>>
>>      at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>      at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>      at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
>> Source)
>>      at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>      at 
>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
>> Source)
>>      at 
>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>>  Source)
>>      at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>      at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>      at 
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>      at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>      at 
>> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>      at 
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>      at 
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>      at 
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>      at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>      at 
>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>      at 
>> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>      at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>> Source)
>>      at 
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>      at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>      at 
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>  Source)
>>      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>> Source)
>>      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>> Source)
>>      at java.base/java.lang.Thread.run(Unknown Source)
>>      Suppressed: java.lang.OutOfMemoryError
>>              at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown 
>> Source)
>>              at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>              at 
>> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>              at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>              at 
>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
>> Source)
>>              at 
>> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown 
>> Source)
>>              at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>              at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>              at 
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>              ... 15 more
>>
>> ```
>>
>> --
>> *Regards,*
>> *Oscar / Chen Hua Wei*
>>
>>
>>

Reply via email to