你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。 以mapPartition为例,可以通过以下三个步骤实现相同的功能: 1. dataStream.map(record -> (subtaskIndex, record)),为每个Record增加处理该record时子任务编号。 2. dataStream.assignTimestampsAndWatermarks,为每个Record增加相同的时间戳。 3. dataStream.window(TumblingEventTimeWindows.of(Time.seconds(1))).apply(mapPartition udf),基于固定时间窗口开窗,收集全量数据进行并使用和mapPartition相同的逻辑进行处理。
以下链接中是相关的示例代码,其中的第一步和第二步,在DataSet API被移除之前后续会考虑为DataStream API提供相关的工具方法: https://netcut.cn/p/dc693599e9031cd7