[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2017-03-18 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
Wow, finally in!

It was fun and thanks for you help along every step of the journey!

Chen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2982: [FLINK-4460] Side Outputs in Flink

2017-03-18 Thread chenqin
Github user chenqin closed the pull request at:

https://github.com/apache/flink/pull/2982


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-09 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105235710
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

Thanks @kl0u Good catch! 

I put `isLate` there with intention to filter out `dropped events with 
other reasons` which I may not aware of.  lateArrivingEvents is really `late 
arriving` and `dropped` events.

@aljoscha If that is redundant check, we might just remove `isLate`. 
What do you think?
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104997566
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

That sounds right, good catch! 
Thanks for fixing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104996349
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map<Integer, Tuple2<Integer, List>> virtualSelectNodes;
+   private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
--- End diff --

sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104995971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

sounds good to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104846393
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +418,35 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* Example:
+* {@code
+* static final OutputTag sideOutputTag = new 
OutputTag("side-output") {};
+*
+* public void flatMap(X value, Collector out) throws Exception 
{
--- End diff --

Comments seems out of date, I think we already decided to get ride of 
CollectorWrapper


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

may consider call out this behavior in `getSideOutput` comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847832
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, 
StreamGraph streamGraph) {
headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof 
ForwardPartitioner)
&& upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
+   && edge.getOutputTag() == null // disable 
chaining for side outputs
--- End diff --

I remember you mentioned latest version side output works with chain or no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847733
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

Do you think introduce this dependency is good idea or bad idea? Up to you 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map<Integer, Tuple2<Integer, List>> virtualSelectNodes;
+   private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
--- End diff --

We might consider use  `addVirtualSideOutputNode`  and 
`virtualSideOutputNodes`. Unless we want to refactor move away from current 
assumption `operator` to `<<tag1,IN1>...<tagX,INX> operator 
<<taga,OUTa>...<tagx,OUTX>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2017-03-01 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha 
Nice! Let's do "Chen Qin qinnc...@gmail.com"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2017-02-17 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha Looks good to me 👍

I briefly looked at your git branch, a minor comment would be adding 
comments to `sideOutputLateData` so user get better idea when they opt-in to 
late arriving event stream. 

Initial late arriving event is decided by comparing watermark & eventTime, 
do you think there is a need to allow user pass a kinda `Evaluator` and enable 
user sideOutput any kind of sideOutputs?

`window.sideOutput(OutputTag, Evaluator)`

`interface Evaluator{ MergedWindows, key, watermark}`

- Regarding `split` `select`, I think there is a chance of consolidate 
select and build upon `OutputTag`, but might be out of this PR's scope.
- Regarding to `WindowStream`, I am a bit confused to figure out  if I use 
`allowedlateness` and  `sideOutputLateData` at same time.

Thanks,
Chen





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2017-02-15 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha, 

diff has been updated and merged to support `apply(Function, 
lateElementsOutput);`
`Collector` refactor seems not viable option as stated above.

Seems travis-ci timeouted here, is there a way to increase timeout setting?
https://travis-ci.org/chenqin/flink/builds/201329246

Chen



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2016-12-13 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha Thanks for your time. We can chat more after 1.2 release!

I think it makes sense to extends Collector, even though we may not remove 
collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in 
[FLIP-13 email 
thread](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html)
 

Only point I would like to add: there seems decent amount of refactor to 
replace underlining output chains using collect(tag, element), yet seems 
reasonable investment moving forward (multiple inputs/ multiple outputs)

`tooLateEvents()` method is something added for user's convenience. should 
be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same 
type as input (which is like already fixed once input type defined). Add late 
arriving tag within apply method seems redudant. In fact, without any changes 
to this diff, user also be able to access late arriving events via following 
way.

`
OutputTag lateElementsOutput = new LateArrivingOutputTag();
DataStream input = ...
SingleOutputStreamOperator windowed = input
  .keyBy(...)
  .window(...)
  .apply(Function);

DataStream lateElements = windowed.getSideOutput(lateElementsOutput);
`

Thanks,
Chen



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2016-12-09 Thread chenqin
Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
cc @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2982: [FLINK-4460] Side Outputs in Flink

2016-12-09 Thread chenqin
GitHub user chenqin opened a pull request:

https://github.com/apache/flink/pull/2982

[FLINK-4460] Side Outputs in Flink


[FLIP-13](https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink)
 Expose sideOutput with `OutputTag`, 

For those userFunction provide `Collector collector` as a parameter, 
 - it offer a util class`CollectorWrapper wrapper = new 
CollectorWrapper(collector);` which can write sideOutput element 
`wrapper.collect(OutputTag tag, sideout)` as well as 
`getSideOutput(OutputTag tag)` in `singleStreamOutputOpeator` and get 
sideOutput DataStream.
 - each OutputTag with same type can have different value, getSideOutput 
will only expose element with exact same OutputTag type and value. 

sideOutput Late arriving event if
- time characteristic set to eventTime
- all assigned window(s) isLate return(s) true 
- event timestamp no later than currentWatermark+ allowedLateness)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenqin/flink flip

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2982.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2982


commit de674f19fcbe9955cb4208ef0938fe5b0f7adc90
Author: Chen Qin <qinnc...@fgmail.com>
Date:   2016-10-21T19:38:04Z

allow mutpile output stream

commit 3d91e6c69dbfbcb2c73dcc37ac2d8ed637a374eb
Author: Chen Qin <c...@uber.com>
Date:   2016-11-29T21:24:09Z

Merge branch 'master' into flip

commit 977b2d7fc54e1f9663a5ceb8a62ed2af5a955ca6
Author: Chen Qin <c...@uber.com>
Date:   2016-12-01T22:19:56Z

allow mutiple OutputTag with same type
implement windowopeator late arriving events
add unit/integration tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---