[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user jgrier commented on the issue: https://github.com/apache/flink/pull/2982 Nice :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 Wow, finally in! It was fun and thanks for you help along every step of the journey! Chen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I finally merged it. Could you please also close this PR? And thanks again for working on this for so long! ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Nice! Let's do "Chen Qin qinnc...@gmail.com" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin Would you like your commits to be attributed to "Chen Qin" or "Chen Qin "? I see both in your set of commits? I'm finally putting everything together and hopefully merging soon since the mailing list discussion seems to favour our approach. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that. Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Looks good to me ð I briefly looked at your git branch, a minor comment would be adding comments to `sideOutputLateData` so user get better idea when they opt-in to late arriving event stream. Initial late arriving event is decided by comparing watermark & eventTime, do you think there is a need to allow user pass a kinda `Evaluator` and enable user sideOutput any kind of sideOutputs? `window.sideOutput(OutputTag, Evaluator)` `interface Evaluator{ MergedWindows, key, watermark}` - Regarding `split` `select`, I think there is a chance of consolidate select and build upon `OutputTag`, but might be out of this PR's scope. - Regarding to `WindowStream`, I am a bit confused to figure out if I use `allowedlateness` and `sideOutputLateData` at same time. Thanks, Chen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha, diff has been updated and merged to support `apply(Function, lateElementsOutput);` `Collector` refactor seems not viable option as stated above. Seems travis-ci timeouted here, is there a way to increase timeout setting? https://travis-ci.org/chenqin/flink/builds/201329246 Chen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Thanks for your time. We can chat more after 1.2 release! I think it makes sense to extends Collector, even though we may not remove collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in [FLIP-13 email thread](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html) Only point I would like to add: there seems decent amount of refactor to replace underlining output chains using collect(tag, element), yet seems reasonable investment moving forward (multiple inputs/ multiple outputs) `tooLateEvents()` method is something added for user's convenience. should be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same type as input (which is like already fixed once input type defined). Add late arriving tag within apply method seems redudant. In fact, without any changes to this diff, user also be able to access late arriving events via following way. ` OutputTag lateElementsOutput = new LateArrivingOutputTag(); DataStream input = ... SingleOutputStreamOperator windowed = input .keyBy(...) .window(...) .apply(Function); DataStream lateElements = windowed.getSideOutput(lateElementsOutput); ` Thanks, Chen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I had a quick look at the implementation and it looks quite good. I'll look at it in more detail once the 1.2 release is out and then I'll also have more thorough comments. These are some quick comments off the top of my head: - I think we can extend `Collector` with a `collect(OutputTag, T)` method. Then we wouldn't need the extra `RichCollector` and `CollectorWrapper` to work around that. - For `WindowedStream` I would like to have something like this: ``` OutputTag lateElementsOutput = ...; DataStream input = ... SingleOutputStreamOperator windowed = input .keyBy(...) .window(...) .apply(Function, lateElementsOutput); DataStream lateElements = windowed.getSideOutput(lateElementsOutput); ``` or maube something else if we find a better Idea. With the `WindowedStream.tooLateElements()` this would instantiate an extra `WindowOperator` just for getting late elements while another window operator would be responsible for processing the actual elements. That seems wasteful. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 cc @aljoscha --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---