I also have a question that if wrong windowing is messing up the received timestamp ??
Thanks and regards Mohil On Wed, Aug 5, 2020 at 1:19 PM Mohil Khare <[email protected]> wrote: > Just to let you know, this is how I setup kafkaIO read: > > p > > .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read() > > .withBootstrapServers(servers) > > .withTopics(Arrays.asList(“topic1”, “topic2”)) > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(ByteArrayDeserializer.class) > > .withConsumerConfigUpdates(kafkaConsumerProperties) > > .withConsumerFactoryFn(consumerFactoryObj) > > .commitOffsetsInFinalize()) > > .apply("Applying_Fixed_Window", Window.<KafkaRecord<String, > byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > > .withAllowedLateness(Duration.standardSeconds(360)) > > .discardingFiredPanes()) > > .apply("Convert_KafkaRecord_To_PCollection<POJO>", > > ParDo.of(new ParseLogs()))); > > > > Where kafkaConsumerProperties is following map > > > kafkaConsumerProperties.put("security.protocol", "SSL"); > > kafkaConsumerProperties.put("auto.offset.reset", "latest"); > > kafkaConsumerProperties.put("group.id", “consumer1”); > > kafkaConsumerProperties.put("default.api.timeout.ms", 180000); > > > And inside consumerFactoryObj I setup ssl keystrokes > > > Thanks and Regards > > Mohil > > On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <[email protected]> wrote: > >> Hello, >> >> I am using Beam java Sdk 2.19 on dataflow. We have a system where log >> shipper continuously emit logs to kafka and beam read logs using KafkaIO. >> >> Sometime I am seeing slowness on kafkaIO read with one of the topics >> (probably during peak traffic period), where there is a 2-3 minutes between >> record timestamp and time when the beam reads the log. For instance: >> >> 2020-08-05 12:46:23.826 PDT >> >> {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”, >> “data” : data}, offset: 2148857. timestamp: 1596656685005 >> >> >> >> If you convert record timestamp (1596656685005) which is in epoch ms to >> PDT, you will see approx 2 mins difference between this and 2020-08-05 >> 12:46:23.826 PDT (time when beam actually reads the data). >> >> So One way of achieving horizontal scaling here is by increasing the >> number of partitions on kafka broker. What can be done on the beam side >> i.e. kafkaIO side to tackle this slowness ? Any suggestions? >> >> Thanks and regards >> Mohil >> >> >> >>
