I recently noticed that DirectRunner was leaking readers eventually
crashing my pipeline. It is fixed in master (PR 4658
<https://github.com/apache/beam/pull/4658>, version 2.4.0-SNAPSHOT). Can
you try that? In my case the pipeline ran out of file descriptors.

Note that DirectRunner is not particularly optimized for runtime
performance. It is often used for initial testing. Since the performance is
alright for you initially, trying it out with master might help.

Note that TimestampedValue<> does not actually change the timestamp of the
event. KafkaIO uses processing time for event time by default. Please see
JavaDoc for KafkaIO for more options.


On Wed, Feb 28, 2018 at 6:59 PM <linr...@itri.org.tw> wrote:

> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
>
>
>
> With using this sdk, there are a situation about *data* *latency*, and
> the description of situation is in the following.
>
>
>
> The data come from kafak with a fixed speed: 100 data size/ 1 sec.
>
>
>
> I create a fixed window within 1 sec without delay. I found that the data
> size is 70, 80, 104, or greater than or equal to 104.
>
>
>
> After one day, the data latency happens in my running time, and the data
> size will be only 10 in each window.
>
>
>
> *In order to clearly explain it, I also provide my code in the following. *
>
> " PipelineOptions readOptions = PipelineOptionsFactory.*create*();
>
> *final* Pipeline p = Pipeline.*create*(readOptions);
>
>
>
> PCollection<TimestampedValue<KV<String, String>>> readData =
>
>   p.apply(KafkaIO.<String, String>*read*()
>
>      .withBootstrapServers("127.0.0.1:9092")
>
>      .withTopic("kafkasink")
>
>      .withKeyDeserializer(StringDeserializer.*class*)
>
>      .withValueDeserializer(StringDeserializer.*class*)
>
>      .withoutMetadata())
>
>      .apply(ParDo.*of*(*new* *DoFn<KV<String, String>,
> TimestampedValue<KV<String, String>>>()* {
>
>         @ProcessElement
>
>         *public* *void* test(ProcessContext c) *throws* ParseException {
>
>             String element = c.element().getValue();
>
>             *try* {
>
>               JsonNode arrNode = *new* ObjectMapper().readTree(element);
>
>               String t = arrNode.path("v").findValue("Timestamp"
> ).textValue();
>
>               DateTimeFormatter formatter = 
> DateTimeFormatter.*ofPattern*("MM/dd/uuuu
> HH:mm:ss.SSSS");
>
>              LocalDateTime dateTime = LocalDateTime.*parse*(t, formatter);
>
>              java.time.Instant java_instant = dateTime.atZone(ZoneId.
> *systemDefault*()).toInstant();
>
>              Instant timestamp  = *new* Instant(java_instant.toEpochMilli());
>
>
>               c.output(TimestampedValue.*of*(c.element(), timestamp));
>
>             } *catch* (JsonGenerationException e) {
>
>                 e.printStackTrace();
>
>             } *catch* (JsonMappingException e) {
>
>                 e.printStackTrace();
>
>           } *catch* (IOException e) {
>
>                 e.printStackTrace();
>
>           }
>
>         }}));
>
>
>
> PCollection<TimestampedValue<KV<String, String>>> readDivideData =
> readData.apply(
>
>       Window.<TimestampedValue<KV<String, String>>> *into*(FixedWindows.
> *of*(Duration.*standardSeconds*(1))
>
>           .withOffset(Duration.*ZERO*))
>
>           .triggering(AfterWatermark.*pastEndOfWindow*()
>
>              .withLateFirings(AfterProcessingTime.*pastFirstElementInPane*
> ()
>
>                .plusDelayOf(Duration.*ZERO*)))
>
>           .withAllowedLateness(Duration.*ZERO*)
>
>           .discardingFiredPanes());"
>
>
>
> *In addition, the running result is as shown in the following.*
>
> "data-size=104
>
> coming-data-time=2018-02-27 02:00:49.117
>
> window-time=2018-02-27 02:00:49.999
>
>
>
> data-size=70
>
> coming-data-time=2018-02-27 02:00:50.318
>
> window-time=2018-02-27 02:00:50.999
>
>
>
> data-size=104
>
> coming-data-time=2018-02-27 02:00:51.102
>
> window-time=2018-02-27 02:00:51.999
>
>
>
> After one day:
>
> data-size=10
>
> coming-data-time=2018-02-28 02:05:48.217
>
> window-time=2018-03-01 10:35:16.999 "
>
>
>
> *For repeating my situation, my running environment is:*
>
> OS: Ubuntn 14.04.3 LTS
>
>
>
> JAVA: JDK 1.7
>
>
>
> *Beam 2.0.0 (with Direct runner)*
>
>
>
> *Kafka 2.10-0.10.1.1*
>
>
>
> Maven 3.5.0, in which dependencies are listed in pom.xml:
>
> <dependency>
>
>       <groupId>org.apache.beam</groupId>
>
>       <artifactId>beam-*sdks*-java-core</artifactId>
>
>       <version>2.0.0</version>
>
>     </dependency>
>
> <dependency>
>
>    <groupId>org.apache.beam</groupId>
>
>   <artifactId>beam-runners-direct-java</artifactId>
>
>   <version>2.0.0</version>
>
>   <scope>runtime</scope>
>
> </dependency>
>
>
>
> <dependency>
>
> <groupId>org.apache.beam</groupId>
>
>    <artifactId>beam-*sdks*-java-*io*-*kafka*</artifactId>
>
>    <version>2.0.0</version>
>
> </dependency>
>
>
>
>
>
> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>
> <dependency>
>
>    <groupId>org.apache.kafka</groupId>
>
>    <artifactId>*kafka*-clients</artifactId>
>
>    <version>0.10.0.1</version>
>
> </dependency>
>
>
>
> If you have any idea about the problem (data latency), I am looking
> forward to hearing from you.
>
>
>
> Thanks
>
>
>
> Rick
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>

Reply via email to