The Dataflow Runner does not currently shut down when all watermarks are at
positive infinity.

On Fri, Jun 24, 2016 at 1:06 PM, Raghu Angadi <[email protected]> wrote:

> Thanks Thomas. That makes sense. Do you know if Dataflow runner shuts down?
>
> [yep, I should have mentioned BoundedWindow.TIMESTAMP_MAX_VALUE]
>
> On Fri, Jun 24, 2016 at 11:04 AM, Thomas Groh <[email protected]> wrote:
>
>> The behavior of a runner with regards to source invocation when the
>> source emits the maximum watermark (nit: BoundedWindow.TIMESTAMP_MAX_VALUE
>> is the maximum timestamp; this is Long.MAX_VALUE in microseconds since the
>> epoch, not millis as might be assumed) is currently runner-defined. This
>> will cause watermark-based timers for the Global Window to fire, and all
>> input elements should be considered droppably late. The DirectRunner will
>> shut down by default if it reaches this state, but runners are not required
>> to shut down if all watermarks reach this value.
>>
>> On Fri, Jun 24, 2016 at 9:46 AM, Raghu Angadi <[email protected]> wrote:
>>
>>> Note that KafkaIO lets you set your own watermark for each record.
>>>
>>> On Fri, Jun 24, 2016 at 9:45 AM, Raghu Angadi <[email protected]>
>>> wrote:
>>>
>>>> So the main question here is how one can stop the unbounded pipeline at
>>>> runtime.
>>>>
>>>> You can emit a special watermark (Long.MAX_VALUE) that will flush the
>>>> entire pipeline. and will process. If that also makes runner stop reading
>>>> from source, I am not sure, I would like to know. After that, I don't know
>>>> if p.run() actually returns.
>>>>
>>>> On Thu, Jun 23, 2016 at 4:02 PM, Jesse Anderson <[email protected]>
>>>> wrote:
>>>>
>>>>> No code example that I know of. Look over the bounded read code in
>>>>> KafkaIO. Use that as a base.
>>>>>
>>>>> On Thu, Jun 23, 2016, 3:57 PM amir bahmanyari <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks Jesse.
>>>>>> Any KafkaIO code example that detects that end of file pls?
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* Jesse Anderson <[email protected]>
>>>>>> *To:* amir bahmanyari <[email protected]>; "
>>>>>> [email protected]" <[email protected]>
>>>>>> *Sent:* Thursday, June 23, 2016 3:39 PM
>>>>>>
>>>>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO
>>>>>>
>>>>>> You bound on an end of file message you emit at the producer. So the
>>>>>> consumer or Kafka IO read would continue to read until an end of file
>>>>>> message is reached. The number in the read method is arbitrary. You would
>>>>>> write your own.
>>>>>>
>>>>>> On Thu, Jun 23, 2016, 3:34 PM amir bahmanyari <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks Jesse.
>>>>>> I know bounded should do it. But, bounded gets tricky when you dont
>>>>>> know how many records you may have in the data file.
>>>>>> There is an upper bound, but what if there are more records than the
>>>>>> upper-bound?
>>>>>> I can set a counter in-memory, and check for its value. But, I need a
>>>>>> way to interrupt p.run().
>>>>>> Not sure if there is something like this in Beam API...
>>>>>> I appreciate other folks' opinions on this topic as well....
>>>>>> Thanks again.
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* Jesse Anderson <[email protected]>
>>>>>> *To:* amir bahmanyari <[email protected]>; "
>>>>>> [email protected]" <[email protected]>
>>>>>> *Sent:* Thursday, June 23, 2016 3:26 PM
>>>>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO
>>>>>>
>>>>>> You could make a bounded Kafka IO and wait for an end of file message.
>>>>>> That said, I don't know if Kafka is the right technology for what
>>>>>> you're trying to do. You might just process the files directly at that
>>>>>> point.
>>>>>>
>>>>>> On Thu, Jun 23, 2016, 3:10 PM amir bahmanyari <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Sorry colleagues.
>>>>>> I know "End-of-data" & Unbounded dont go hand in hand.
>>>>>> Lets say I am invoking KafkaIO  unbounded.
>>>>>> But at some point I run out of streaming data (finite number of
>>>>>> records in my data file) and p.run() keeps running/waiting for more data
>>>>>> and doesn't terminate of course.
>>>>>> How do I know there has not been any more data recently coming to
>>>>>> KafkaIo.read() for a given amount of time or any other runtime indicaor?
>>>>>> Is there a way to interrupt p.run() upon detecting such an indicator
>>>>>> so the execution can move on with the rest of the code?
>>>>>> Thanks+regards
>>>>>> Amir
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to