[ https://issues.apache.org/jira/browse/BEAM-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raghu Angadi resolved BEAM-3770. -------------------------------- Resolution: Not A Bug > The problem of kafkaIO sdk for data latency > ------------------------------------------- > > Key: BEAM-3770 > URL: https://issues.apache.org/jira/browse/BEAM-3770 > Project: Beam > Issue Type: Improvement > Components: io-java-kafka > Affects Versions: 2.0.0 > Environment: For repeating my situation, my running environment is: > OS: Ubuntn 14.04.3 LTS > JAVA: JDK 1.7 > Beam 2.0.0 (with Direct runner) > Kafka 2.10-0.10.1.1 > Maven 3.5.0, in which dependencies are listed in pom.xml: > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-core</artifactId> > <version>2.0.0</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-direct-java</artifactId> > <version>2.0.0</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-kafka</artifactId> > <version>2.0.0</version> > </dependency> > <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>0.10.0.1</version> > </dependency> > Reporter: Rick Lin > Assignee: Raghu Angadi > Priority: Major > Fix For: 2.0.0 > > > Dear all, > I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner). > With using this sdk, there are a situation about *data* *latency*, and the > description of situation is in the following. > The data come from kafak with a fixed speed: 100 data size/ 1 sec. > I create a fixed window within 1 sec without delay. I found that the data > size is 70, 80, 104, or greater than or equal to 104. > After one day, the data latency happens in my running time, and the data > size will be only 10 in each window. > *In order to clearly explain it, I also provide my code in the following.* > " PipelineOptions readOptions = PipelineOptionsFactory._create_(); > *final* Pipeline p = Pipeline._create_(readOptions); > PCollection<TimestampedValue<KV<String, String>>> readData = > p.apply(KafkaIO.<String, String>_read_() > .withBootstrapServers("127.0.0.1:9092") > .withTopic("kafkasink") > .withKeyDeserializer(StringDeserializer.*class*) > .withValueDeserializer(StringDeserializer.*class*) > .withoutMetadata()) > .apply(ParDo._of_(*new* +DoFn<KV<String, String>, > TimestampedValue<KV<String, String>>>()+ { > @ProcessElement > *public* *void* test(ProcessContext c) *throws* ParseException { > String element = c.element().getValue(); > *try* { > JsonNode arrNode = *new* ObjectMapper().readTree(element); > String t = arrNode.path("v").findValue("Timestamp").textValue(); > DateTimeFormatter formatter = > DateTimeFormatter._ofPattern_("MM/dd/uuuu HH:mm:ss.SSSS"); > LocalDateTime dateTime = LocalDateTime._parse_(t, formatter); > java.time.Instant java_instant = > dateTime.atZone(ZoneId._systemDefault_()).toInstant(); > Instant timestamp = *new* Instant(java_instant.toEpochMilli()); > c.output(TimestampedValue._of_(c.element(), timestamp)); > } *catch* (JsonGenerationException e) { > e.printStackTrace(); > } *catch* (JsonMappingException e) { > e.printStackTrace(); > } *catch* (IOException e) { > e.printStackTrace(); > } > }})); > PCollection<TimestampedValue<KV<String, String>>> readDivideData = > readData.apply( > Window.<TimestampedValue<KV<String, String>>> > _into_(FixedWindows._of_(Duration._standardSeconds_(1)) > .withOffset(Duration.*_ZERO_*)) > .triggering(AfterWatermark._pastEndOfWindow_() > .withLateFirings(AfterProcessingTime._pastFirstElementInPane_() > .plusDelayOf(Duration.*_ZERO_*))) > .withAllowedLateness(Duration.*_ZERO_*) > .discardingFiredPanes());" > *In addition, the running result is as shown in the following.* > "data-size=104 > coming-data-time=2018-02-27 02:00:49.117 > window-time=2018-02-27 02:00:49.999 > data-size=78 > coming-data-time=2018-02-27 02:00:50.318 > window-time=2018-02-27 02:00:50.999 > data-size=104 > coming-data-time=2018-02-27 02:00:51.102 > window-time=2018-02-27 02:00:51.999 > After one day: > data-size=10 > coming-data-time=2018-02-28 02:05:48.217 > window-time=2018-03-01 10:35:16.999 " > If you have any idea about the problem (data latency), I am looking forward > to hearing from you. > Thanks > Rick -- This message was sent by Atlassian JIRA (v7.6.3#76005)