Might it be that your initial source never stops? A loop will only
terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I wrote a proof of concept for a Java version of mapWithState with
> time-based state eviction
> https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java.
> The idea is:
>
>  - Convert an input KeyedStream with key K and value V into a KeyedStream
> of Either<V, K>, with the original values as Left.
>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
> stores the timestamp of the last time that state was accessed.
>  - Define a IterativeStream from the Either stream, and apply a
> transformation function that periorically sends "tombstone" events as Right
> events in the closeWith of the IterativeStream. When a tombstone is
> received, delete the state with clear if it the time since it was last
> accessed is bigger than a configured time to live.
>
> This seems to work so far, but there are some things that look weird to
> me:
>
>  - The program never seems to stop, event though I Ihave defined the
> IterativeStream with
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long-
> . The value of seems to be ignored. I'm using a custom source function, but
> it seems like the method SourceFunction.cancel() it's not being called.
>
>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
> already contains a Metric with the name 'numRecordsOut'. Metric will not be
> reported. (null)". What does that mean?
>
> Thanks,
>
> Juan
>

Reply via email to