Hi all,

Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. Our
Kafka settings are not changed*[4]*.

The services are very stable before migration. However, we get OOM errors
*[5]* after the APIs migration.

Does anyone encounter the same issue? Or anyone can give us suggestions
about the settings?

Many Thanks!

[1] Kafka | Apache Flink
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
[2] new Kafka APIs
```

def getKafkaSource[T: TypeInformation](config: Config,
                                       topic: String,
                                       parallelism: Int,
                                       uid: String,
                                       env: StreamExecutionEnvironment,
                                       deserializer:
DeserializationSchema[T]): DataStream[T] = {
  val properties = getKafkaCommonProperties(config)

  properties.put(ConsumerConfig.GROUP_ID_CONFIG,
config.getString("kafka.group.id"))
  properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
config.getString("kafka.session.timeout.ms"))
  properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
config.getString("kafka.receive.buffer.bytes"))

  properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
"3600000")

  val source = KafkaSource.builder[T]()
    .setProperties(properties)
    .setTopics(topic)
    .setValueOnlyDeserializer(deserializer)
    
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .build()

  env
    .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
    .uid(uid)
    .setParallelism(math.min(parallelism, env.getParallelism))
    .setMaxParallelism(parallelism)
}

def getKafkaSink[T: TypeInformation](config: Config,
                                     serializer:
KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
  val properties = getKafkaCommonProperties(config)

  properties.put(ProducerConfig.LINGER_MS_CONFIG,
config.getString("kafka.linger.ms"))
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG,
config.getString("kafka.batch.size"))
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
config.getString("kafka.compression.type"))

  KafkaSink.builder[T]()
    .setKafkaProducerConfig(properties)
    .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
    .setRecordSerializer(serializer)
    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build()
}

```
[3] New Serializer

import java.lang
import java.nio.charset.StandardCharsets
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import com.appier.rt.short_term_score.model.UserSTState

class UserSTStateSerializer(topic: String) extends
KafkaRecordSerializationSchema[UserSTState] {
  override def serialize(element: UserSTState, context:
KafkaRecordSerializationSchema.KafkaSinkContext, timestamp:
lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
  }
}

[4] Kafka Settings

# Common
retries = "15"
retry.backoff.ms = "500"
reconnect.backoff.ms = "1000"

# Producer
linger.ms = "5"
batch.size = "1048576"
compression.type = "gzip"

# Consumer
group.id = "<censored>"
session.timeout.ms = "100000"
receive.buffer.bytes = "8388608"

[5] *Error Message*
```
java.lang.OutOfMemoryError

        at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
        at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
Source)
        at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
        at 
java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown
Source)
        at 
java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
        at 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
        at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
        at 
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
        at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
        Suppressed: java.lang.OutOfMemoryError
                at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown 
Source)
                at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
                at 
java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
                at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
                at 
java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown
Source)
                at 
java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown
Source)
                at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
                at java.base/java.io.ObjectOutputStream.close(Unknown Source)
                at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
                ... 15 more

```

-- 
*Regards,*
*Oscar / Chen Hua Wei*

Reply via email to