Thank You!
Here is a Example:
      // Execution Env
      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      // Input Source
      DataStreamSource<Tuple3<String, String, String>> source =
env.fromElements(
              new Tuple3<>("productID1", "click", "user_1"),
              new Tuple3<>("productID1", "click", "user_2"),
              new Tuple3<>("productID1", "browse", "user_1"),
              new Tuple3<>("productID2", "browse", "user_1"),
              new Tuple3<>("productID2", "click", "user_2"),
              new Tuple3<>("productID2", "click", "user_1")
              ....
              new Tuple3<>("productID50", "click", "user_1")
              ....
              new Tuple3<>("productID90", "click", "user_1")
              ....
              new Tuple3<>("productID100", "click", "user_1")

      );

      // Split Stream
      SplitStream<Tuple3<String, String, String>> splitStream =
source.split(new OutputSelector<Tuple3<String, String, String>>() {
          @Override
          public Iterable<String> select(Tuple3<String, String, String>
value) {

              ArrayList<String> output = new ArrayList<>();
              output.add(value.f0);
              return output;

          }
      });

      // Select Stream
     * // Here: I want to select products 1 to 100 and then process each
one.*

*      //  How should I do it?*
splitStream.select("productID1").print();

      env.execute();

Regards.

Wesley Peng <[email protected]> 于2019年9月17日周二 上午10:05写道:

>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>

回复