Привет Иван,

The source of your problem is quite easy:
- If you do windowing by event time, all the sources need to emit watermarks.
- watermarks are the logical clock used when event-time timing
- you could use either processing time windows, or adjust watermark strategy of 
your sources accordingly

... didn't check other potential sources of troubles in your code

Hope this helps

Thias


-----Original Message-----
From: Иван Борисов <ivan.s.bori...@gmail.com> 
Sent: Freitag, 30. Juni 2023 05:45
To: user@flink.apache.org
Subject: Join two streams

Hello,
plz help me, I can't join two streams. In the joined stream I've got zero 
messages and can't understand why?

Kafka Topics:
1st stream
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
2nd stream
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}


DataStream<Sensor>T1_Stream = env.fromSource( T1_Source, 
WatermarkStrategy.noWatermarks(),
"T1 Stream");

DataStream<Sensor> T2_Stream = env.fromSource( T2_Source, 
WatermarkStrategy.noWatermarks(),
"T2 Stream");

DataStream<Double> comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getCompare_with())
.equalTo(T2 -> T2.getSensor_Name())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { double firstValue = 
T1.getTemp(); double secondValue = T2.getTemp(); double m = 
firstValue-secondValue; return m; }); 
comparisonStream.writeAsText("/tmp/output_k.txt",
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);

And my file is empty!
What am I do wrong?

--
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to