Hi Peihui,

確認下你想調用的方法是不是 connect?因為看起來 stream1.collect(stream2) 不像是 DataStream 支援的 API
如果是的話,想請問你 ConfigSource() 有沒有配置 WatermarkStrategy?connect 後的算子是透過上游兩個算子的
watermark 取最小作為輸出。
因此,如果只定義其中一邊的 WatermarkStrategy 會導致這個算子的 watermark 無法推進。

詳細可以參考這個章節
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#watermarks-in-parallel-streams

Best regards,

Peihui He <peihu...@gmail.com> 於 2022年11月14日 週一 下午2:40寫道:

> Hi,
>
> 如题,代码大概如下:
>
> stream1  = env.fromSource(kafkaSource, wartermarkStrategy)
> stream2 = env.addSource(ConfigSource())
>
> stream1.collect(stream2).process(ProcessFunction()).print()
>
> 这种情况下在collect时没有watermark, 是什么原因呢?
>

回复