The Dataflow Runner does not currently shut down when all watermarks are at positive infinity.
On Fri, Jun 24, 2016 at 1:06 PM, Raghu Angadi <[email protected]> wrote: > Thanks Thomas. That makes sense. Do you know if Dataflow runner shuts down? > > [yep, I should have mentioned BoundedWindow.TIMESTAMP_MAX_VALUE] > > On Fri, Jun 24, 2016 at 11:04 AM, Thomas Groh <[email protected]> wrote: > >> The behavior of a runner with regards to source invocation when the >> source emits the maximum watermark (nit: BoundedWindow.TIMESTAMP_MAX_VALUE >> is the maximum timestamp; this is Long.MAX_VALUE in microseconds since the >> epoch, not millis as might be assumed) is currently runner-defined. This >> will cause watermark-based timers for the Global Window to fire, and all >> input elements should be considered droppably late. The DirectRunner will >> shut down by default if it reaches this state, but runners are not required >> to shut down if all watermarks reach this value. >> >> On Fri, Jun 24, 2016 at 9:46 AM, Raghu Angadi <[email protected]> wrote: >> >>> Note that KafkaIO lets you set your own watermark for each record. >>> >>> On Fri, Jun 24, 2016 at 9:45 AM, Raghu Angadi <[email protected]> >>> wrote: >>> >>>> So the main question here is how one can stop the unbounded pipeline at >>>> runtime. >>>> >>>> You can emit a special watermark (Long.MAX_VALUE) that will flush the >>>> entire pipeline. and will process. If that also makes runner stop reading >>>> from source, I am not sure, I would like to know. After that, I don't know >>>> if p.run() actually returns. >>>> >>>> On Thu, Jun 23, 2016 at 4:02 PM, Jesse Anderson <[email protected]> >>>> wrote: >>>> >>>>> No code example that I know of. Look over the bounded read code in >>>>> KafkaIO. Use that as a base. >>>>> >>>>> On Thu, Jun 23, 2016, 3:57 PM amir bahmanyari <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks Jesse. >>>>>> Any KafkaIO code example that detects that end of file pls? >>>>>> Thanks >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> *From:* Jesse Anderson <[email protected]> >>>>>> *To:* amir bahmanyari <[email protected]>; " >>>>>> [email protected]" <[email protected]> >>>>>> *Sent:* Thursday, June 23, 2016 3:39 PM >>>>>> >>>>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>>>>> >>>>>> You bound on an end of file message you emit at the producer. So the >>>>>> consumer or Kafka IO read would continue to read until an end of file >>>>>> message is reached. The number in the read method is arbitrary. You would >>>>>> write your own. >>>>>> >>>>>> On Thu, Jun 23, 2016, 3:34 PM amir bahmanyari <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Thanks Jesse. >>>>>> I know bounded should do it. But, bounded gets tricky when you dont >>>>>> know how many records you may have in the data file. >>>>>> There is an upper bound, but what if there are more records than the >>>>>> upper-bound? >>>>>> I can set a counter in-memory, and check for its value. But, I need a >>>>>> way to interrupt p.run(). >>>>>> Not sure if there is something like this in Beam API... >>>>>> I appreciate other folks' opinions on this topic as well.... >>>>>> Thanks again. >>>>>> >>>>>> ------------------------------ >>>>>> *From:* Jesse Anderson <[email protected]> >>>>>> *To:* amir bahmanyari <[email protected]>; " >>>>>> [email protected]" <[email protected]> >>>>>> *Sent:* Thursday, June 23, 2016 3:26 PM >>>>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>>>>> >>>>>> You could make a bounded Kafka IO and wait for an end of file message. >>>>>> That said, I don't know if Kafka is the right technology for what >>>>>> you're trying to do. You might just process the files directly at that >>>>>> point. >>>>>> >>>>>> On Thu, Jun 23, 2016, 3:10 PM amir bahmanyari <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Sorry colleagues. >>>>>> I know "End-of-data" & Unbounded dont go hand in hand. >>>>>> Lets say I am invoking KafkaIO unbounded. >>>>>> But at some point I run out of streaming data (finite number of >>>>>> records in my data file) and p.run() keeps running/waiting for more data >>>>>> and doesn't terminate of course. >>>>>> How do I know there has not been any more data recently coming to >>>>>> KafkaIo.read() for a given amount of time or any other runtime indicaor? >>>>>> Is there a way to interrupt p.run() upon detecting such an indicator >>>>>> so the execution can move on with the rest of the code? >>>>>> Thanks+regards >>>>>> Amir >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>> >>> >> >
