Hi Peihui, 確認下你想調用的方法是不是 connect?因為看起來 stream1.collect(stream2) 不像是 DataStream 支援的 API 如果是的話,想請問你 ConfigSource() 有沒有配置 WatermarkStrategy?connect 後的算子是透過上游兩個算子的 watermark 取最小作為輸出。 因此,如果只定義其中一邊的 WatermarkStrategy 會導致這個算子的 watermark 無法推進。
詳細可以參考這個章節 https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#watermarks-in-parallel-streams Best regards, Peihui He <peihu...@gmail.com> 於 2022年11月14日 週一 下午2:40寫道: > Hi, > > 如题,代码大概如下: > > stream1 = env.fromSource(kafkaSource, wartermarkStrategy) > stream2 = env.addSource(ConfigSource()) > > stream1.collect(stream2).process(ProcessFunction()).print() > > 这种情况下在collect时没有watermark, 是什么原因呢? >