[ https://issues.apache.org/jira/browse/FLINK-24709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
XiaShengSheng resolved FLINK-24709. ----------------------------------- Resolution: Fixed > Fix the issue of interval join java case content in the official document case > ------------------------------------------------------------------------------ > > Key: FLINK-24709 > URL: https://issues.apache.org/jira/browse/FLINK-24709 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.8.0, 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0 > Reporter: XiaShengSheng > Priority: Minor > Labels: pull-request-available > Fix For: 1.14.0 > > Attachments: case.png > > Original Estimate: 1h > Remaining Estimate: 1h > > Fix the interval join java case in the official document case: > Take the flink1.14.0 version document link as an example: > [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/joining/#interval-join] > 1、Your case is: > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; > import org.apache.flink.streaming.api.windowing.time.Time; > ... > DataStream<Integer> orangeStream = ... > DataStream<Integer> greenStream = ... > orangeStream > .keyBy(<KeySelector>) > .intervalJoin(greenStream.keyBy(<KeySelector>)) > .between(Time.milliseconds(-2), Time.milliseconds(1)) > .process (new ProcessJoinFunction<Integer, Integer, String(){ > @Override > public void processElement(Integer left, Integer right, Context ctx, > Collector<String> out) { > out.collect(first + "," + second); > } > }); > 2、After repair: > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; > import org.apache.flink.streaming.api.windowing.time.Time; > ... > DataStream<Integer> orangeStream = ... > DataStream<Integer> greenStream = ... > orangeStream > .keyBy(<KeySelector>) > .intervalJoin(greenStream.keyBy(<KeySelector>)) > .between(Time.milliseconds(-2), Time.milliseconds(1)) > .process (new ProcessJoinFunction<Integer, Integer, String(){ > @Override > public void processElement(Integer left, Integer right, Context ctx, > Collector<String> out) { > out.collect(left + "," + right); > } > }); -- This message was sent by Atlassian Jira (v8.3.4#803005)