Thank you for your answer!
It's work right now, one more question:
I've got few streams from few Kafka topics (if it possible to do other
way and easier I could make one topic or any other modifications) with
sensors measurements into JSON messages:
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}
topic3: {'data': {'temp':32, 'sensore_name': 'T3', 'timestamp':
6757575}, 'compare_with': 'T2'}
topic4: {'data': {'temp':12, 'sensore_name': 'T3', 'timestamp':
67856222}, 'compare_with': 'T1'}
I need to compare T1.data.temp - T2.data.temp (I need to compare it
with EXACTLY last measurement of other sensor (Shown in compare_with),
because measurements could come with different frequency: T1 1 message
per sec, T2 1 message per 5 sec., T3 3 message per sec.) calculate AVG
from this difference in 1 hour window, and if this difference more
than AVG, then make Alarm to somewhere... Don't understand how to do
it?
I did like that:
DataStream<MergedSensors> comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getArbitraged_with())
.equalTo(T2 -> T2.getTicker_symbol())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((JoinFunction<Sensor, Sensor, MergedSensors>) (T1, T2) -> {
Tuple2<Double, Double> spread;
if (T1.getData().getTemp().isEmpty() || T2.getData().getTemp().isEmpty()) {
spread = new Tuple2<>(1.23, 4.56);
} else {
double a = T1.getData().getTemp();
double b = T2.getData().getTemp();
//return 33.22;
spread = new Tuple2<>(a, b);
}
return new MergedSensors(T1.getTimestamp(), T1.getMs_timestamp(), spread);
});
пт, 30 июн. 2023 г. в 12:40, Schwalbe Matthias <[email protected]>:
>
> Привет Иван,
>
> The source of your problem is quite easy:
> - If you do windowing by event time, all the sources need to emit watermarks.
> - watermarks are the logical clock used when event-time timing
> - you could use either processing time windows, or adjust watermark strategy
> of your sources accordingly
>
> ... didn't check other potential sources of troubles in your code
>
> Hope this helps
>
> Thias
>
>
> -----Original Message-----
> From: Иван Борисов <[email protected]>
> Sent: Freitag, 30. Juni 2023 05:45
> To: [email protected]
> Subject: Join two streams
>
> Hello,
> plz help me, I can't join two streams. In the joined stream I've got zero
> messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStream<Sensor>T1_Stream = env.fromSource( T1_Source,
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream<Sensor> T2_Stream = env.fromSource( T2_Source,
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream<Double> comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { double firstValue
> = T1.getTemp(); double secondValue = T2.getTemp(); double m =
> firstValue-secondValue; return m; });
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov | С уважением, Иван Борисов
> mob./WhatsApp: 7 913 088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: [email protected]
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge.
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist
> streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have received
> it in error, please advise the sender by return e-mail and delete this
> message and any attachments. Any unauthorised use or dissemination of this
> information is strictly prohibited.
--
Yours truly, Ivan Borisov | С уважением, Иван Борисов
mob./WhatsApp: 7 913 088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: [email protected]