[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406119#comment-16406119 ] Dawid Wysakowicz commented on BEAM-3414: I think it might be the same problem as noticed in BEAM-3863 > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Dawid Wysakowicz >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352808#comment-16352808 ] Kenneth Knowles commented on BEAM-3414: --- Those instructions are not quite right. The JStorm runner is a work in progress on a development branch, and not released with the nightly build either. You will need to check out that branch and install it locally. I should say that the JStorm runner has not been updated in a very long time. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352010#comment-16352010 ] huangjianhuang commented on BEAM-3414: -- [~kenn] Is jstorm-runner released? [Official document|https://beam.apache.org/documentation/runners/jstorm/] says we can add it by: {code:java} // code placeholder org.apache.beam beam-runners-jstorm 2.2.0 {code} But the dependency seems not available. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352008#comment-16352008 ] huangjianhuang commented on BEAM-3414: -- [~aljoscha] thanks, can it be fixed this month? Our project was delayed by this issue for a long time, if not, we have to try other frameworks. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348542#comment-16348542 ] Aljoscha Krettek commented on BEAM-3414: Still working on getting that PR in, yes. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342837#comment-16342837 ] huangjianhuang commented on BEAM-3414: -- [~kenn] [~aljoscha] are you still fixing this issue? I‘m distressed on this issue. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329998#comment-16329998 ] Kenneth Knowles commented on BEAM-3414: --- Can I give it to you Aljoscha, since you have that PR just about in? > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314517#comment-16314517 ] huangjianhuang commented on BEAM-3414: -- Thanks for help. Does this bug appear in other Runner? Or can you give me some advice which Runner is more close to the DirectRunner. My codes works fine with DirectRunner but got so many problems with FlinkRunner:( > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313329#comment-16313329 ] Aljoscha Krettek commented on BEAM-3414: I openen a PR for a related bug and I think this issue should also be fixed by the change: https://github.com/apache/beam/pull/4348 The reason for the issue here is that processing-time timers don't block shutdown of a pipeline. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312593#comment-16312593 ] huangjianhuang commented on BEAM-3414: -- the trigger would never fire after 06:34:55.505 if i didnot send new data. how could i fix it without new input? > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)