Hi Peter,

That's a good idea, but may not be applicable with an iteration operator.
The operator can
not determine when to generate the "end-of-stream message" for the feedback
stream.
The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
no side-effects.

Best,
Xingcan



On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <peter.e...@gmx.net> wrote:

> Hi Xingcan!
>
> if a _finite_ stream would, at the end, emit a special, trailing
> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
> this enable us to deterministically end the iteration without needing a
> timeout?
>
> Having an arbitrary timeout that must be longer than any iteration step
> takes seems really awkward.
>
> What you think?
>
> Best regards
> Peter
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
>> meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
>> meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>>
>>
>
>

Reply via email to