[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645931#comment-16645931
 ] 

Hequn Cheng commented on FLINK-7999:


I think [FLINK-8478|https://issues.apache.org/jira/browse/FLINK-8478] and 
[FLINK-5725|https://issues.apache.org/jira/browse/FLINK-5725] both can solve 
the problem. I will close this task.

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Seth Wiesman
>Priority: Major
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2018-07-06 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534562#comment-16534562
 ] 

Stephan Ewen commented on FLINK-7999:
-

Would FLINK-8478 give you the desired functionality?


> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Seth Wiesman
>Priority: Major
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2017-11-10 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674
 ] 

Seth Wiesman commented on FLINK-7999:
-

Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

Events:  Metadata: 
timestamp | id | spend   id | start_date   | end_date | 
currency_code
- 
--- 
2017-11-11 03:00 | 1  | 0.25   1 | 2017-11-10 | 2017-11-12 |   "USD"
2017-11-11 03:02 | 2  | 0.03   2 | 2017-04-02 | 2019-12-31 |   "EUR"
2017-11-11 03:05 | 1  | 0.11

I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

{code:java}
class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 
ValueStateDescriptor[CampaignMetadata]("campaign", 
createTypeInformation[CampaignMetadata])

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
val campaign = getRuntimeContext.getState(descriptor).value()

if (campaign != null && campaign.start <= ctx.timestamp()) {
  out.collect(value.copy(meta = campaign))
}
  }

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
val end = value.end.getTime + allowedLateness
if (end < ctx.timerService().currentWatermark()) {
  return
}

ctx.timerService().registerEventTimeTimer(end)
getRuntimeContext.getState(descriptor).update(value)
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, 
CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = {
val state= getRuntimeContext.getState(descriptor)
val campaign = state.value()

if (campaign != null) {
  val end = campaign.end.getTime + allowedLateness

  if (end == timestamp) {
state.clear()
  }
}
  }
}

{code}

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Seth Wiesman
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2017-11-06 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241303#comment-16241303
 ] 

Xingcan Cui commented on FLINK-7999:


Hi [~sjwiesman], thanks for opening this. Could you give some more specific 
explanations about this kind of join?

Thanks, Xingcan

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Seth Wiesman
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)