[GitHub] flink pull request #4607: [FLINK-6306][connectors] Sink for eventually consi...
Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/4607 ---
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 * clicked the wrong button ---
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 I'm going to go ahead and close this pr and issue to avoid confusion. ---
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 Would you be able to rerun travis, the test failed on a single configuration during the Kafka09ITTest due to a task manager failure. I do not believe any of my code changes touched any of the code paths in that test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 CC: @aljoscha I screwed up the rebase so I cherry picked the updates into a new branch and re-opened the pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4607: [FLINK-6306][connectors] Sink for eventually consi...
GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/4607 [FLINK-6306][connectors] Sink for eventually consistent file systems ## What is the purpose of the change This pull request implements a sink for writing out to an eventually consistent filesystem, such as Amazon S3, with exactly once semantics. ## Brief change log - The sink stages files on a consistent filesystem (local, hdfs, etc) . - Once per checkpoint, files are copied to the eventually consistent filesystem. - When a checkpoint completion notification is sent, the files are marked consistent. Otherwise, they are left because delete is not a consistent operation. - It is up to consumers to choose their semantics; at least once by reading all files, or exactly once by only reading files marked consistent. ## Verifying this change This change added tests and can be verified as follows: - Added tests based on the existing BucketingSink test suite. - Added tests that verify semantics based on different checkpointing combinations (successful, concurrent, timed out, and failed). - Added integration test that verifies exactly once holds during failure. - Manually verified by having run in production for several months. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper:no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-6306 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4607.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4607 commit 347ea767195d74efc39964c02ace1bbe10d8aa0a Author: Seth Wiesman <swies...@mediamath.com> Date: 2017-08-27T21:36:04Z [FLINK-6306][connectors] Sink for eventually consistent file systems --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3752: [FLINK-6306] [filesystem-connectors] Sink for even...
Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3752 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3752: [FLINK-6306] [filesystem-connectors] Sink for eventually ...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3752 @tzulitai you took interest in the jira ticket, would you be willing to review this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3752: [FLINK-6306] [filesystem-connectors] Sink for even...
GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3752 [FLINK-6306] [filesystem-connectors] Sink for eventually consistent file systems https://issues.apache.org/jira/browse/FLINK-6306 This PR introduces a bucketer for eventually consistent file systems such as Amazon S3, guaranteeing exactly once output across failure and concurrent checkpoints (thank you @StephanEwen). I have attempted to keep the api as similar the the BucketingSink as possible including the shared use of writers for specifying output format. Currently there is documentation in the form of javadoc, once the api is settled I will make another PR with updated documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-6306 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3752.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3752 commit c778ce282e60577f1d9a105e9cffa295938642b9 Author: Seth Wiesman <swies...@mediamath.com> Date: 2017-04-14T19:11:53Z FLINK-6306 Sink for eventually consistent file systems https://issues.apache.org/jira/browse/FLINK-6306 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3729: [FLINK-6315] Notify on checkpoint timeout
Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3729 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3729: [FLINK-6315] Notify on checkpoint timeout
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3729 @aljoscha Could you review this or point me in the direction of the correct person? I need this to land for the EventualConsistencySink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3729: [FLINK-6315] Notify on checkpoint timeout
GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3729 [FLINK-6315] Notify on checkpoint timeout https://issues.apache.org/jira/browse/FLINK-6315 A common use case when writing a custom operator that outputs data to some third party location to partially output on checkpoint and then commit on notifyCheckpointComplete. If that external system does not gracefully handle rollbacks (such as Amazon S3 not allowing consistent delete operations) then that data needs to be handled by the next checkpoint. The idea is to add a new interface similar to CheckpointListener that provides a callback when the CheckpointCoordinator times out a checkpoint This is required for the eventually consistent sink coming in FLINK-6306 to be able to differentiate between concurrent checkpoints and timed out checkpoints. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-6315 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3729 commit 323851929772b4c57e65b7146e96af6687d3a2eb Author: Seth Wiesman <swies...@mediamath.com> Date: 2017-04-15T21:13:20Z FLINK-6315 Notify on checkpoint timeout https://issues.apache.org/jira/browse/FLINK-6315 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3479: [FLINK-5929] Allow Access to Per-Window State in ProcessW...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Done! Thank you for for helping me get this feature merged in. This has to be one of the most painless commits I've ever made to an open source project of this size. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...
Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3479: [FLINK-5929] Allow Access to Per-Window State in ProcessW...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Pushed the fix, I had to update SideOutputsITCase so the ProcessAllWindowFunctions had a noop clear method. All tests passed locally, take a look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3479: [FLINK-5929] Allow Access to Per-Window State in ProcessW...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 It looks like when I rebased on master I broke one of the scala side outputs test. I'm going to push a fix right now but it won't change any of the code surrounding this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3479: [FLINK-5929] Allow Access to Per-Window State in ProcessW...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 sorry for the delay, things got crazy at work. let me know if there are any issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...
Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105763162 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java --- @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction( } @Override - public void apply(Byte key, final W window, Iterable input, Collector out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable input, Collector out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } - wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() { --- End diff -- whoops ð± --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3479: [FLINK-5929] Allow Access to Per-Window State in ProcessW...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 @aljoscha I made the changes you asked for. Just a heads up, there are a number of files that were superficially changed when migrating from apply -> process but are otherwise untouched. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...
Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104492162 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- I noticed an issue when removing apply. The method is used inside of AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an argument to its evaluateWindow method. When creating the context I can get the global keyed state backend from the operator, but not the partitioned state because those methods are protected. Now the only two uses of this class are its subclasses which have both been deprecated. My question is, do you think I should modify the evaluateWindow method to accept a keyed state store which wraps the operator partitioned state or just throw an exception on context.windowState() because all valid uses of this method have been deprecated? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...
Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104458722 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- @aljoscha I meant to ask, should I leave this method or remove it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3479: [FLINK-5929] Allow Access to Per-Window State in P...
GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3479 [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings. @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-5929 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3479 commit 623edd1fb107e8dd0aae755a7b252df1f91713bd Author: Seth Wiesman <swies...@mediamath.com> Date: 2017-03-06T04:07:18Z [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---