Привет Иван, The source of your problem is quite easy: - If you do windowing by event time, all the sources need to emit watermarks. - watermarks are the logical clock used when event-time timing - you could use either processing time windows, or adjust watermark strategy of your sources accordingly
... didn't check other potential sources of troubles in your code Hope this helps Thias -----Original Message----- From: Иван Борисов <ivan.s.bori...@gmail.com> Sent: Freitag, 30. Juni 2023 05:45 To: user@flink.apache.org Subject: Join two streams Hello, plz help me, I can't join two streams. In the joined stream I've got zero messages and can't understand why? Kafka Topics: 1st stream topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp': 123123131}, 'compare_with': 'T2'} 2nd stream topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp': 53543543}, 'compare_with': 'T1'} DataStream<Sensor>T1_Stream = env.fromSource( T1_Source, WatermarkStrategy.noWatermarks(), "T1 Stream"); DataStream<Sensor> T2_Stream = env.fromSource( T2_Source, WatermarkStrategy.noWatermarks(), "T2 Stream"); DataStream<Double> comparisonStream = T1_Stream .join(T2_Stream) .where(T1 -> T1.getCompare_with()) .equalTo(T2 -> T2.getSensor_Name()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { double firstValue = T1.getTemp(); double secondValue = T2.getTemp(); double m = firstValue-secondValue; return m; }); comparisonStream.writeAsText("/tmp/output_k.txt", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE); And my file is empty! What am I do wrong? -- Yours truly, Ivan Borisov | С уважением, Иван Борисов mob./WhatsApp: 7 913 088 8882 Telegram: @Ivan_S_Borisov Skype: ivan.s.borisov e-mail: ivan.s.bori...@gmail.com Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.