Just to let you know, this is how I setup kafkaIO read:

p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic1”, “topic2”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogs())));



Where kafkaConsumerProperties is following map


kafkaConsumerProperties.put("security.protocol", "SSL");

kafkaConsumerProperties.put("auto.offset.reset", "latest");

kafkaConsumerProperties.put("group.id", “consumer1”);

kafkaConsumerProperties.put("default.api.timeout.ms", 180000);


And inside consumerFactoryObj I setup ssl keystrokes


Thanks and Regards

Mohil

On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <[email protected]> wrote:

> Hello,
>
> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>
> Sometime I am seeing slowness on kafkaIO read with one of the topics
> (probably during peak traffic period), where there is a 2-3 minutes between
> record timestamp and time when the beam reads the log. For instance:
>
> 2020-08-05 12:46:23.826 PDT
>
>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
> “data” : data}, offset: 2148857. timestamp: 1596656685005
>
>
>
> If you convert record timestamp (1596656685005) which is in epoch ms to
> PDT, you will see approx 2 mins difference between  this and 2020-08-05
> 12:46:23.826 PDT (time when beam actually reads the data).
>
> So One way of achieving horizontal scaling here is by increasing the
> number of partitions on kafka broker. What can be done on the beam side
> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>
> Thanks and regards
> Mohil
>
>
>
>

Reply via email to