[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-12-15 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15750880#comment-15750880
 ] 

Anton Solovev commented on FLINK-3109:
--

Hi [~yangjunpro], [~StephanEwen] has this issue a relevance ?

> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>Assignee: Anton Solovev
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203543#comment-15203543
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-199043205
  
Hi StephanEwen, sorry for replying this late.

This will join two window streams as long as slide size of these two 
windows are equal. It can be two SlidingTimeWindows, one SlidingTimeWindow and 
one TumblingTimeWindow. If slide size of windows are not equal, an exception 
will be threw. 

It doesn't support generic windows and triggers yet. Only TimeWindow is 
allowed in the API.  



> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191368#comment-15191368
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-195495184
  
I just saw that you updated this pull request (actually a few weeks ago 
already)

A lot of it looks very good, some things we need to check a bit deeper 
(like how triggers actually behave on the two separate windows, how windows are 
matched).

Can you give a high level summary of how this should behave?
Especially given that you allow for custom triggers and window assigners 
here, how are windows matched against each other (to determine that their 
elements should be joined/co-grouped).
For tumbling time windows, the behavior is well defined and like discussed 
in the JIRA issue, but for generic windows and triggers, how is it defined?


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139097#comment-15139097
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-181922703
  
@wangyangjun We actually merged all the changes concerning the state 
abstraction.

To make this window join work seamlessly on Flink's state backends (memory, 
or key/value stores, managed memory, ...) you would need to implement it 
against the key/value state. That means that whenever you store data in the 
operator, the data should go into the partitioned state that you can obtain 
from the `AbstractStreamOperator` or the `RuntimeContext`.

I think that for this window operator, the `ListState` is a good choice, 
where you can add values to a key and retrieve the list as a whole once the 
windows are evaluated.

Please write back if you need some more pointers on the state abstraction.



> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125480#comment-15125480
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-177599020
  
Concerning the data management: @aljoscha and me are currently heavily 
reworking that.
All window operations need to go onto the "state" interfaces. Before we 
merge this one, we should also do that, so please do not spend much time on 
optimizing how the buffers for the two inputs are implemented.

The interfaces for that will go into the code in a few days (they are in 
this pull request: https://github.com/apache/flink/pull/1562)

For now, I would focus on the API and we look into the buffers in a few 
days.

BTW: how exactly the buffered data is held (managed memory, external 
databases, etc) depends on the "state backend" of the job. Memory behavior can 
be changed that way and the operators need not worry about that.


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123809#comment-15123809
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-176869415
  
Hi @wangyangjun,
sorry for the long wait but I think we can get this PR in after some 
modifications. I'd like to change the API a bit to separate it from the other 
Join implementation since that class is already quite crowded. What I would 
propose is to add a method `timeJoin()` on DataStream and a new class 
`TimeJoinedStreams` that is similar to `JoinedStreams` but specific to the 
two-buffer time join.

Could you also please add support for the Scala API, we try to keep the two 
APIs in sync. If you need help with that, please let me know.


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15124398#comment-15124398
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-177007794
  
Hello @aljoscha , 
As I mentioned in last comment, I will reimplement it with [Guava 
CacheBuilder](http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html).
 One question is about Flink self-memory management, does CacheBuilder get 
memory from Flink or JVM directly? As I understand, HeapWindowBuffer allocates 
memory from Flink. Is there any data structure like cachedMap in Flink?


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123979#comment-15123979
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on the pull request:

https://github.com/apache/flink/pull/1527#issuecomment-176912402
  
Hello, @aljoscha ,
Thanks for your comment and suggestion. I will update the API. During my 
benchmark tests of timeJoin, the feature of this implementation is quite good, 
but the performance is bad. I will reimplement it with 
[Guava](http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/MapMaker.html).

Yes, I could add support for the Scala API.

Thanks


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110057#comment-15110057
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1527#discussion_r50360393
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
 ---
@@ -234,6 +234,109 @@ public void invoke(String value) throws Exception {
Assert.assertEquals(expectedResult, testResults);
}
 
+
+   // TODO: design buffer join test
--- End diff --

@wangyangjun Don't worry. Failing tests seems unrelated to your changes. 
There are some flaky tests in Flink.


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108657#comment-15108657
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

GitHub user wangyangjun opened a pull request:

https://github.com/apache/flink/pull/1527

[FLINK-3109]Join two streams with two different buffer time -- Java i…

Java implementation of jira 
[FLINK-3109](https://issues.apache.org/jira/browse/FLINK-3109)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangyangjun/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1527


commit a521c83eb31b653f0a4bfc9da58837a587a378c4
Author: Yangjun Wang 
Date:   2015-12-05T01:16:49Z

[FLINK-3109]Join two streams with two different buffer time -- Java 
implementation




> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108998#comment-15108998
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1527#discussion_r50288042
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+
+public class StreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private static final long serialVersionUID = 8650694601687319011L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamJoinOperator.class);
+
+   private HeapWindowBuffer stream1Buffer;
+   private HeapWindowBuffer stream2Buffer;
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+   private long stream1WindowLength;
+   private long stream2WindowLength;
+
+   protected transient long currentWatermark1 = -1L;
+   protected transient long currentWatermark2 = -1L;
+   protected transient long currentWatermark = -1L;
+
+   private TypeSerializer inputSerializer1;
+   private TypeSerializer inputSerializer2;
+   /**
+* If this is true. The current processing time is set as the timestamp 
of incoming elements.
+* This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+* if eviction should happen based on processing time.
+*/
+   private boolean setProcessingTime = false;
+
+   public StreamJoinOperator(JoinFunction userFunction,
+   KeySelector keySelector1,
+   KeySelector keySelector2,
+   long stream1WindowLength,
+   long stream2WindowLength,
+   TypeSerializer inputSerializer1,
+   TypeSerializer inputSerializer2) {
+   super(userFunction);
+   this.keySelector1 = requireNonNull(keySelector1);
+   this.keySelector2 = requireNonNull(keySelector2);
+
+   this.stream1WindowLength = requireNonNull(stream1WindowLength);
+   this.stream2WindowLength = requireNonNull(stream2WindowLength);
+
+   this.inputSerializer1 = requireNonNull(inputSerializer1);
+   this.inputSerializer2 = requireNonNull(inputSerializer2);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   if (null == inputSerializer1 || null == inputSerializer2) {
+   throw new IllegalStateException("Input serializer was 
not set.");
+   }
+
+   

[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109407#comment-15109407
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/1527#discussion_r50318604
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
 ---
@@ -234,6 +234,109 @@ public void invoke(String value) throws Exception {
Assert.assertEquals(expectedResult, testResults);
}
 
+
+   // TODO: design buffer join test
--- End diff --

@tillrohrmann Do you know why the checks have failed? There are 5 build 
jobs, only 3 of them passed. This is my first time to commit to an open source 
project. I have no idea how my code affects the failed tests.


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109390#comment-15109390
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/1527#discussion_r50318034
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+
+public class StreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private static final long serialVersionUID = 8650694601687319011L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamJoinOperator.class);
+
+   private HeapWindowBuffer stream1Buffer;
+   private HeapWindowBuffer stream2Buffer;
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+   private long stream1WindowLength;
+   private long stream2WindowLength;
+
+   protected transient long currentWatermark1 = -1L;
+   protected transient long currentWatermark2 = -1L;
+   protected transient long currentWatermark = -1L;
+
+   private TypeSerializer inputSerializer1;
+   private TypeSerializer inputSerializer2;
+   /**
+* If this is true. The current processing time is set as the timestamp 
of incoming elements.
+* This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+* if eviction should happen based on processing time.
+*/
+   private boolean setProcessingTime = false;
+
+   public StreamJoinOperator(JoinFunction userFunction,
+   KeySelector keySelector1,
+   KeySelector keySelector2,
+   long stream1WindowLength,
+   long stream2WindowLength,
+   TypeSerializer inputSerializer1,
+   TypeSerializer inputSerializer2) {
+   super(userFunction);
+   this.keySelector1 = requireNonNull(keySelector1);
+   this.keySelector2 = requireNonNull(keySelector2);
+
+   this.stream1WindowLength = requireNonNull(stream1WindowLength);
+   this.stream2WindowLength = requireNonNull(stream2WindowLength);
+
+   this.inputSerializer1 = requireNonNull(inputSerializer1);
+   this.inputSerializer2 = requireNonNull(inputSerializer2);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   if (null == inputSerializer1 || null == inputSerializer2) {
+   throw new IllegalStateException("Input serializer was 
not set.");
+   }
+
+   

[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109388#comment-15109388
 ] 

ASF GitHub Bot commented on FLINK-3109:
---

Github user wangyangjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/1527#discussion_r50318008
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
 ---
@@ -234,6 +234,109 @@ public void invoke(String value) throws Exception {
Assert.assertEquals(expectedResult, testResults);
}
 
+
+   // TODO: design buffer join test
--- End diff --

As you can see in the code, the test case is implemented. I forgot to 
remove this comment.


> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-11 Thread Wang Yangjun (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091769#comment-15091769
 ] 

Wang Yangjun commented on FLINK-3109:
-

During last month, this implementation worked well in my work. I think the 
feature could be contributed. 

> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time

2016-01-10 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091162#comment-15091162
 ] 

Stephan Ewen commented on FLINK-3109:
-

[~yangjun.wan...@gmail.com] How is your experience with that implementation? Do 
you think it is in a state that it could be contributed?

> Join two streams with two different buffer time
> ---
>
> Key: FLINK-3109
> URL: https://issues.apache.org/jira/browse/FLINK-3109
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Wang Yangjun
>  Labels: easyfix, patch
> Fix For: 0.10.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Current Flink streaming only supports join two streams on the same window. 
> How to solve this problem?
> For example, there are two streams. One is advertisements showed to users. 
> The tuple in which could be described as (id, showed timestamp). The other 
> one is click stream -- (id, clicked timestamp). We want get a joined stream, 
> which includes all the advertisement that is clicked by user in 20 minutes 
> after showed.
> It is possible that after an advertisement is shown, some user click it 
> immediately. It is possible that "click" message arrives server earlier than 
> "show" message because of Internet delay. We assume that the maximum delay is 
> one minute.
> Then the need is that we should alway keep a buffer(20 mins) of "show" stream 
> and another buffer(1 min) of "click" stream.
> It would be grate that there is such an API like.
> showStream.join(clickStream)
> .where(keySelector)
> .buffer(Time.of(20, TimeUnit.MINUTES))
> .equalTo(keySelector)
> .buffer(Time.of(1, TimeUnit.MINUTES))
> .apply(JoinFunction)
> http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)