Thank You!
Here is a Example:
// Execution Env
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Input Source
DataStreamSource<Tuple3<String, String, String>> source =
env.fromElements(
new Tuple3<>("productID1", "click", "user_1"),
new Tuple3<>("productID1", "click", "user_2"),
new Tuple3<>("productID1", "browse", "user_1"),
new Tuple3<>("productID2", "browse", "user_1"),
new Tuple3<>("productID2", "click", "user_2"),
new Tuple3<>("productID2", "click", "user_1")
....
new Tuple3<>("productID50", "click", "user_1")
....
new Tuple3<>("productID90", "click", "user_1")
....
new Tuple3<>("productID100", "click", "user_1")
);
// Split Stream
SplitStream<Tuple3<String, String, String>> splitStream =
source.split(new OutputSelector<Tuple3<String, String, String>>() {
@Override
public Iterable<String> select(Tuple3<String, String, String>
value) {
ArrayList<String> output = new ArrayList<>();
output.add(value.f0);
return output;
}
});
// Select Stream
* // Here: I want to select products 1 to 100 and then process each
one.*
* // How should I do it?*
splitStream.select("productID1").print();
env.execute();
Regards.
Wesley Peng <[email protected]> 于2019年9月17日周二 上午10:05写道:
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>