Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-24 Thread Maximilian Michels
I'd suggest a modified option (2) which does not use a timer to perform 
the cleanup (as mentioned, this will cause problems with migrating state).


Instead, whenever we receive a watermark which closes the global window, 
we enumerate all keys and cleanup the associated state.


This is the cleanest and simplest option.

-Max

On 24.08.20 20:47, Thomas Weise wrote:


On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský > wrote:


 > The most general solution would be 3), given it can be agnostic
to window types and does not assume extra runner capabilities.

Agree, 2) is optimization to that. It might be questionable if this
is premature optimization, but generally querying multiple states
for each clear opeartion to any state might be prohibitive, mostly
when the state would be stored in external database (in case of
Flink that would be RocksDB).

For the use case I'm looking at, we are using the heap state backend. I 
have not checked the RocksDB, but would assume that incremental cost of 
isEmpty() for other states under the same key is negligible?


 > 3) wouldn't require any state migration.

Actually, it would, as we would (ideally) like to migrate users'
pipelines that already contain timers for the end of global window,
which might not expire ever.

Good catch. This could potentially be addressed by upgrading the timer 
in the per record path.


On 8/24/20 7:44 PM, Thomas Weise wrote:


On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

If there are runners, that are unable to efficiently enumerate
keys in state, then there probably isn't a runner agnostic
solution to this. If we focus on Flink, we can provide
specific implementation of CleanupTimer, which might then do
anything from the mentioned options. I'd be +1 for option 2)
for key-aligned windows (all currently supported) and option
3) for unaligned windows in the future.

The most general solution would be 3), given it can be agnostic to
window types and does not assume extra runner capabilities. It
would require to introspect all user states for a given key on
state.clear. That assumes as efficient implementation of
isEmpty(). If all states are empty (have been cleared), then we
can remove the cleanup timer. And add it back on state.add. I'm
planning to give that a shot (for Flink/portable/streaming) to see
how it performs.

We should also consider how we migrate users from the current
state to any future implementation. In case of option 2) it
should be possible to do this when the state is loaded from
savepoint, but I'm not 100% sure about that.

3) wouldn't require any state migration.

Jan

On 8/21/20 6:25 AM, Thomas Weise wrote:

Thanks for the clarification.

Here are a few potential options to address the issue, based
on the discussion so far:

1) Optionally skip cleanup timer for global window
(user-controlled via pipeline option)

2) Instead of setting a cleanup timer for every key, handle
all keys for a given window with a single timer. This would
be runner specific and depend on if/how a given
runner supports key enumeration. Flink's keyed state backend
supports enumerating keys for a namespace (Beam window) and
state tag. [1]

3) Set the cleanup timer only when there is actually state
associated with a key. This could be accomplished by
intercepting append and clear in BagUserStateHandler [2] and
adding/removing the timer appropriately.

4) See if TTL support in the runner can is applicable, for
Flink see [3]

[1]

https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76

[2]

https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315

[3]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl


On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax mailto:re...@google.com>> wrote:

Also +1 to what Jan said. Streaming pipelines can process
bounded PCollections on some paths, so the global window
will terminate for those paths. This is also true for the
direct runner  tetsts where PCollections pretend to be
unbounded, but we then advance the watermark to +inf to
terminate the pipeline.

On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
mailto:re...@google.com>> wrote:

It is not Dataflow specific, but I think Dataflow is
the only runner that currently 

Beam Dependency Check Report (2020-08-24)

2020-08-24 Thread Apache Jenkins Server
ERROR: File 'src/build/dependencyUpdates/beam-dependency-check-report.html' does not exist