I also have a question that if wrong windowing is messing up the received
timestamp ??

Thanks and regards
Mohil

On Wed, Aug 5, 2020 at 1:19 PM Mohil Khare <[email protected]> wrote:

> Just to let you know, this is how I setup kafkaIO read:
>
> p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic1”, “topic2”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogs())));
>
>
>
> Where kafkaConsumerProperties is following map
>
>
> kafkaConsumerProperties.put("security.protocol", "SSL");
>
> kafkaConsumerProperties.put("auto.offset.reset", "latest");
>
> kafkaConsumerProperties.put("group.id", “consumer1”);
>
> kafkaConsumerProperties.put("default.api.timeout.ms", 180000);
>
>
> And inside consumerFactoryObj I setup ssl keystrokes
>
>
> Thanks and Regards
>
> Mohil
>
> On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <[email protected]> wrote:
>
>> Hello,
>>
>> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
>> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>>
>> Sometime I am seeing slowness on kafkaIO read with one of the topics
>> (probably during peak traffic period), where there is a 2-3 minutes between
>> record timestamp and time when the beam reads the log. For instance:
>>
>> 2020-08-05 12:46:23.826 PDT
>>
>>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
>> “data” : data}, offset: 2148857. timestamp: 1596656685005
>>
>>
>>
>> If you convert record timestamp (1596656685005) which is in epoch ms to
>> PDT, you will see approx 2 mins difference between  this and 2020-08-05
>> 12:46:23.826 PDT (time when beam actually reads the data).
>>
>> So One way of achieving horizontal scaling here is by increasing the
>> number of partitions on kafka broker. What can be done on the beam side
>> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>

Reply via email to