Re: join two streams with pyflink

2024-04-02 Thread Biao Geng
Hi Thierry,

Your case is not very complex and I believe all programming language(e.g.
Java, Python, SQL) interfaces of flink can do that.
When using pyflink, you can use pyflink datastream/table/SQL API.
Here are some examples of using pyflink table api:
https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/basic_operations.html
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/tests/test_join.py

Hope it helps!

Best,
Biao Geng



Fokou Toukam, Thierry  于2024年4月2日周二
15:41写道:

> Hi,
>
> i have 2 streams as sean in this example (*6> {"tripId": "275118740",
> "timestamp": "2024-04-02T06:20:00Z", "stopSequence": 13, "stopId": "61261",
> "bearing": 0.0, "speed": 0.0, "vehicleId": "39006", "routeId": "747"}*
> *1> {"visibility": 1, "weather_conditions": "clear sky", "timestamp":
> "2024-04-02T02:19:39.281768"}*) and  i want to merge them based on
> timestamp value. Which flink can i use or how can i do it?
>
> Thanks!
>
> *Thierry FOKOU *| * IT M.A.Sc  Student*
>
>
>
>


Re: Join two streams

2023-06-30 Thread Иван Борисов
Thank you for your answer!

It's work right now, one more question:

I've got few streams from few Kafka topics (if it possible to do other
way and easier I could make one topic or any other modifications) with
sensors measurements into JSON messages:

topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}
topic3: {'data': {'temp':32, 'sensore_name': 'T3', 'timestamp':
6757575}, 'compare_with': 'T2'}
topic4: {'data': {'temp':12, 'sensore_name': 'T3', 'timestamp':
67856222}, 'compare_with': 'T1'}

I need to compare T1.data.temp - T2.data.temp (I need to compare it
with EXACTLY last measurement of other sensor (Shown in compare_with),
because measurements could come with different frequency: T1 1 message
per sec, T2 1 message per 5 sec., T3 3 message per sec.) calculate AVG
from this difference in 1 hour window, and if this difference more
than AVG, then make Alarm to somewhere... Don't understand how to do
it?

I did like that:

DataStream comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getArbitraged_with())
.equalTo(T2 -> T2.getTicker_symbol())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((JoinFunction) (T1, T2) -> {

Tuple2 spread;
if (T1.getData().getTemp().isEmpty() || T2.getData().getTemp().isEmpty()) {
spread = new Tuple2<>(1.23, 4.56);
} else {
double a = T1.getData().getTemp();
double b = T2.getData().getTemp();
//return 33.22;
spread = new Tuple2<>(a, b);

}
return new MergedSensors(T1.getTimestamp(), T1.getMs_timestamp(), spread);
});

пт, 30 июн. 2023 г. в 12:40, Schwalbe Matthias :
>
> Привет Иван,
>
> The source of your problem is quite easy:
> - If you do windowing by event time, all the sources need to emit watermarks.
> - watermarks are the logical clock used when event-time timing
> - you could use either processing time windows, or adjust watermark strategy 
> of your sources accordingly
>
> ... didn't check other potential sources of troubles in your code
>
> Hope this helps
>
> Thias
>
>
> -Original Message-
> From: Иван Борисов 
> Sent: Freitag, 30. Juni 2023 05:45
> To: user@flink.apache.org
> Subject: Join two streams
>
> Hello,
> plz help me, I can't join two streams. In the joined stream I've got zero 
> messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStreamT1_Stream = env.fromSource( T1_Source, 
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream T2_Stream = env.fromSource( T2_Source, 
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction) (T1, T2) -> { double firstValue 
> = T1.getTemp(); double secondValue = T2.getTemp(); double m = 
> firstValue-secondValue; return m; }); 
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
> mob./WhatsApp: 7 913  088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: ivan.s.bori...@gmail.com
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit 
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung 
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. 
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
>
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of email 
> communication cannot be guaranteed, we do not accept any responsibility for 
> the confidentiality and the intactness of this message. If you have received 
> it in error, please advise the sender by return e-mail and delete this 
> message and any attachments. Any unauthorised use or dissemination of this 
> information is strictly prohibited.



-- 
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com


RE: Join two streams

2023-06-29 Thread Schwalbe Matthias
Привет Иван,

The source of your problem is quite easy:
- If you do windowing by event time, all the sources need to emit watermarks.
- watermarks are the logical clock used when event-time timing
- you could use either processing time windows, or adjust watermark strategy of 
your sources accordingly

... didn't check other potential sources of troubles in your code

Hope this helps

Thias


-Original Message-
From: Иван Борисов  
Sent: Freitag, 30. Juni 2023 05:45
To: user@flink.apache.org
Subject: Join two streams

Hello,
plz help me, I can't join two streams. In the joined stream I've got zero 
messages and can't understand why?

Kafka Topics:
1st stream
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
2nd stream
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}


DataStreamT1_Stream = env.fromSource( T1_Source, 
WatermarkStrategy.noWatermarks(),
"T1 Stream");

DataStream T2_Stream = env.fromSource( T2_Source, 
WatermarkStrategy.noWatermarks(),
"T2 Stream");

DataStream comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getCompare_with())
.equalTo(T2 -> T2.getSensor_Name())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply((JoinFunction) (T1, T2) -> { double firstValue = 
T1.getTemp(); double secondValue = T2.getTemp(); double m = 
firstValue-secondValue; return m; }); 
comparisonStream.writeAsText("/tmp/output_k.txt",
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);

And my file is empty!
What am I do wrong?

--
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Join two streams

2023-06-29 Thread Shrihari R
Hey Иван,

Use *TumblingProcessingTimeWindows* instead of TumblingEventTimeWindows.
TumblingEventTimeWindows requires a watermark strategy.


*Ref*
https://stackoverflow.com/questions/72291659/flink-tumbling-window-is-not-triggered-no-watermark-strategy

*Regards*
Shrihari

On Fri, Jun 30, 2023 at 9:16 AM Иван Борисов 
wrote:

> Hello,
> plz help me, I can't join two streams. In the joined stream I've got
> zero messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStreamT1_Stream = env.fromSource(
> T1_Source,
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream T2_Stream = env.fromSource(
> T2_Source,
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction) (T1, T2) -> {
> double firstValue = T1.getTemp();
> double secondValue = T2.getTemp();
> double m = firstValue-secondValue;
> return m;
> });
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
> mob./WhatsApp: 7 913  088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: ivan.s.bori...@gmail.com
>

-- 


THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL 
INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE 
HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY 
REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY 
PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE 
FROM YOUR SYSTEM.

NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE 
CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL 
INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.

RAPIDO 
DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF 
THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION 
PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL 
TIMES SUBJECT TO CHANGE WITHOUT NOTICE


Re: Join two streams from Kafka

2021-02-11 Thread Arvid Heise
Hi Shamit,

unless you have some temporal relationship between the records to be
joined, you have to use a regular join over stream 1 and stream 2.
Since you cannot define any window, all data will be held in Flink's state,
which is not an issue for a few millions but probably means you have to use
rocksdb statebackend [1] or else you may run out of main memory.

I recommend using Flink SQL or Table API, which will also prune all
unnecessary columns from your data. If you want to use DataStream API
instead, I recommend to drop all unrelated columns prior to the join.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend

On Tue, Feb 9, 2021 at 10:47 PM Shamit  wrote:

> Hello Flink Users,
>
> I am newbie and have question on join of two streams (stream1 and stream2 )
> from Kafka topic based on some key.
>
> In my use case I need to join with stream2 data which might be year old and
> more.
>
> Now if on stream1 the data gets arrived today and I need to join with
> stream2 based on some key Please let me know how efficiently I can do.
>
> stream2 might have lots of records(in millions).
>
> Please help.
>
> Regards,
> Shamit Jain
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Join two streams using a count-based window

2016-06-10 Thread Nikos R. Katsipoulakis
Thank you very much Matthias! Also, the link you provided is very helpful.

Cheers,
Nikos

On Fri, Jun 10, 2016 at 3:16 AM, Matthias J. Sax  wrote:

> I just put an answer to SO.
>
> About the other questions: Flink processes tuple-by-tuple and does some
> internal buffering. You might be interested in
>
> https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
>
> -Matthias
>
> On 06/09/2016 08:13 PM, Nikos R. Katsipoulakis wrote:
> > Hello all,
> >
> > At first, I have a question posted on
> >
> http://stackoverflow.com/questions/37732978/join-two-streams-using-a-count-based-window
> > . I am re-posting this on the mailing list in case some of you are not
> > on SO.
> >
> > In addition, I would like to know what is the difference between Flink
> > and other Streaming engines on data-granularity transport and
> > processing. To be more precise, I am aware that Storm sends tuples using
> > Netty (by filling up queues) and a Bolt's logic is executed per tuple.
> > Spark, employs micro-batches to simulate streaming and (I am not
> > entirely certain) each task performs processing on a micro-batch. What
> > about Flink? How are tuples transferred and processed. Any explanation
> > and or article/blog-post/link is more than welcome.
> >
> > Thanks
> >
> > --
> > Nikos R. Katsipoulakis,
> > Department of Computer Science
> > University of Pittsburgh
>
>


-- 
Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh


Re: Join two streams using a count-based window

2016-06-10 Thread Matthias J. Sax
I just put an answer to SO.

About the other questions: Flink processes tuple-by-tuple and does some
internal buffering. You might be interested in
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks

-Matthias

On 06/09/2016 08:13 PM, Nikos R. Katsipoulakis wrote:
> Hello all,
> 
> At first, I have a question posted on
> http://stackoverflow.com/questions/37732978/join-two-streams-using-a-count-based-window
> . I am re-posting this on the mailing list in case some of you are not
> on SO.
> 
> In addition, I would like to know what is the difference between Flink
> and other Streaming engines on data-granularity transport and
> processing. To be more precise, I am aware that Storm sends tuples using
> Netty (by filling up queues) and a Bolt's logic is executed per tuple.
> Spark, employs micro-batches to simulate streaming and (I am not
> entirely certain) each task performs processing on a micro-batch. What
> about Flink? How are tuples transferred and processed. Any explanation
> and or article/blog-post/link is more than welcome.
> 
> Thanks
> 
> -- 
> Nikos R. Katsipoulakis, 
> Department of Computer Science 
> University of Pittsburgh



signature.asc
Description: OpenPGP digital signature