Just to let you know, this is how I setup kafkaIO read:
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic1”, “topic2”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogs())));
Where kafkaConsumerProperties is following map
kafkaConsumerProperties.put("security.protocol", "SSL");
kafkaConsumerProperties.put("auto.offset.reset", "latest");
kafkaConsumerProperties.put("group.id", “consumer1”);
kafkaConsumerProperties.put("default.api.timeout.ms", 180000);
And inside consumerFactoryObj I setup ssl keystrokes
Thanks and Regards
Mohil
On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <[email protected]> wrote:
> Hello,
>
> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>
> Sometime I am seeing slowness on kafkaIO read with one of the topics
> (probably during peak traffic period), where there is a 2-3 minutes between
> record timestamp and time when the beam reads the log. For instance:
>
> 2020-08-05 12:46:23.826 PDT
>
> {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
> “data” : data}, offset: 2148857. timestamp: 1596656685005
>
>
>
> If you convert record timestamp (1596656685005) which is in epoch ms to
> PDT, you will see approx 2 mins difference between this and 2020-08-05
> 12:46:23.826 PDT (time when beam actually reads the data).
>
> So One way of achieving horizontal scaling here is by increasing the
> number of partitions on kafka broker. What can be done on the beam side
> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>
> Thanks and regards
> Mohil
>
>
>
>