[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645931#comment-16645931 ] Hequn Cheng commented on FLINK-7999: I think [FLINK-8478|https://issues.apache.org/jira/browse/FLINK-8478] and [FLINK-5725|https://issues.apache.org/jira/browse/FLINK-5725] both can solve the problem. I will close this task. > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Seth Wiesman >Priority: Major > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534562#comment-16534562 ] Stephan Ewen commented on FLINK-7999: - Would FLINK-8478 give you the desired functionality? > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Seth Wiesman >Priority: Major > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674 ] Seth Wiesman commented on FLINK-7999: - Sure, I will use the example of my use case. I am using Flink to aggregate spend about campaigns that I am running. Each record in the main stream represents contains both a campaign id as well as information about a single financial transaction, this stream is analogous to the fact table in a traditional data warehouse. However, each campaign runs under a different currency so I need to join with metadata containing the currency code for that campaign. Campaigns have both start and end dates, which can span any amount of time from one day to several months. Events: Metadata: timestamp | id | spend id | start_date | end_date | currency_code - --- 2017-11-11 03:00 | 1 | 0.25 1 | 2017-11-10 | 2017-11-12 | "USD" 2017-11-11 03:02 | 2 | 0.03 2 | 2017-04-02 | 2019-12-31 | "EUR" 2017-11-11 03:05 | 1 | 0.11 I have a valid window of each event can be joined, but it varies by id, today implement this join with the following `CoProcessFunction`. {code:java} class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata, Event] { @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign", createTypeInformation[CampaignMetadata]) override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val campaign = getRuntimeContext.getState(descriptor).value() if (campaign != null && campaign.start <= ctx.timestamp()) { out.collect(value.copy(meta = campaign)) } } override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val end = value.end.getTime + allowedLateness if (end < ctx.timerService().currentWatermark()) { return } ctx.timerService().registerEventTimeTimer(end) getRuntimeContext.getState(descriptor).update(value) } override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = { val state= getRuntimeContext.getState(descriptor) val campaign = state.value() if (campaign != null) { val end = campaign.end.getTime + allowedLateness if (end == timestamp) { state.clear() } } } } {code} > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Seth Wiesman > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241303#comment-16241303 ] Xingcan Cui commented on FLINK-7999: Hi [~sjwiesman], thanks for opening this. Could you give some more specific explanations about this kind of join? Thanks, Xingcan > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Seth Wiesman > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v6.4.14#64029)