Re: Combining multiple stages into a multi-stage processing pipeline

2024-04-07 Thread Mark Petronic
Thank you Yunfeng. Your comments gave me some insights to explore how to
use consecutive windows. So, I coded up a version that looks like this and
works well for me:

KafkaSource => Keyby => TumblingWindows => ProcessWindowFn => WindowAll =>
ProcessWindowFn => (Here I will repeated keyed and windowall in addition
stages)

The missing connection for me was not understanding that I could connect
windows to windows in the same data stream. That understanding made all the
difference. So the now the keyed tumbling windows for the 21 keys each
process N records per key and create a score over that data and output a
POJO containing the score and a List. Then the WindowAll gets
those 21 POJOs of N records and iterates over all 21 * N records to
calculate the overall score. Now that it has in hand the overall score and
the 21 keyed scores from the prior windows, it can compare each of the 21
scores to the overall score and conditionally out.collect() only the
List for the record sets below threshold. Then, subsequent stages
can rinse and repeat this process in one clean job graph.

Thanks again for you thoughts. They really helped light the light bulb for
me :)
Mark


On Sat, Apr 6, 2024 at 11:24 PM Yunfeng Zhou 
wrote:

> Hi Mark,
>
> IMHO, your design of the Flink application is generally feasible. In
> Flink ML, I have once met a similar design in ChiSqTest operator,
> where the input data is first aggregated to generate some results and
> then broadcast and connected with other result streams from the same
> input afterwards. You may refer to this algorithm for more details
> when designing your applications.
>
> https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java
>
> Besides, side outputs are typically used when you want to split an
> output stream into different categories. Given that the
> ProcessWindowFn before each SideOutput-x only has one downstream, it
> would be enough to directly pass the resulting DataStream to session
> windows instead of introducing side outputs.
>
> Best,
> Yunfeng
>
> On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic 
> wrote:
> >
> > I am looking for some design advice for a new Flink application and I am
> relatively new to Flink - I have one, fairly straightforward Flink
> application in production so far.
> >
> > For this new application, I want to create a three-stage processing
> pipeline. Functionally, I am seeing this as ONE long datastream. But, I
> have to evaluate the STAGE-1 data in a special manner to then pass on that
> evaluation to STAGE-2 where it will do its own special evaluation using the
> STAGE-1 evaluation results to shape its evaluation. The same thing happens
> again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
> result is published to Kafka. The stages functionally look like this:
> >
> > STAGE-1
> > KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
> SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
> (SideOutput-2[WindowRecords], KafkaSink[EvalResult])
> > |=> WindowAll => ProcessWindowFn =>
> SideOutput-1 ^
> >
> > STAGE-2
> > SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
> SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
> (SideOutput-4[WindowRecords], KafkaSink[EvalResult])
> >
> > STAGE-3
> > SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
> SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink
> >
> > DESCRIPTION
> >
> > In STAGE-1, there are a fixed number of known keys so I will only see at
> most about 21 distinct keys and therefore up to 21 tumbling one-minute
> windows. I also need to aggregate all data in a global window to get an
> overall non-keyed result. I need to bring the 21 results from those 21
> tumbling windows AND the one global result into one place where I can
> compare each of the 21 windows results to the one global result. Based on
> this evaluation, only some of the 21 windows results will survive that
> test. I want to then take the data records from those, say 3 surviving
> windows, and make them the "source" for STAGE-2 processing as well as
> publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
> reprocess the same data records that the three STAGE-1 surviving windows
> processed, only keying them by different dimensions. I expect there to be
> around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
> this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
> to form the new "source" datastream for STAGE-2.
> >
> > Where I am struggling is:
> >
> > Trying to figure out how to best connect the output of the 21 STAGE-1
> windows and the one WIndowAll window records into a single point (I propose
> SessionWindow1) to be able to compare each of the 21 windows data results
> with the WindowAll non-keyed results.
> > The best way to connect together these multiple stages.
> >
> > Looking a

Re: Combining multiple stages into a multi-stage processing pipeline

2024-04-06 Thread Yunfeng Zhou
Hi Mark,

IMHO, your design of the Flink application is generally feasible. In
Flink ML, I have once met a similar design in ChiSqTest operator,
where the input data is first aggregated to generate some results and
then broadcast and connected with other result streams from the same
input afterwards. You may refer to this algorithm for more details
when designing your applications.
https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java

Besides, side outputs are typically used when you want to split an
output stream into different categories. Given that the
ProcessWindowFn before each SideOutput-x only has one downstream, it
would be enough to directly pass the resulting DataStream to session
windows instead of introducing side outputs.

Best,
Yunfeng

On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic  wrote:
>
> I am looking for some design advice for a new Flink application and I am 
> relatively new to Flink - I have one, fairly straightforward Flink 
> application in production so far.
>
> For this new application, I want to create a three-stage processing pipeline. 
> Functionally, I am seeing this as ONE long datastream. But, I have to 
> evaluate the STAGE-1 data in a special manner to then pass on that evaluation 
> to STAGE-2 where it will do its own special evaluation using the STAGE-1 
> evaluation results to shape its evaluation. The same thing happens again in 
> STAGE-3, using the STAGE-2 evaluation results. Finally, the end result is 
> published to Kafka. The stages functionally look like this:
>
> STAGE-1
> KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn => SideOutput-1 
> |=> SessionWindow1 => ProcessWindowFn => (SideOutput-2[WindowRecords], 
> KafkaSink[EvalResult])
> |=> WindowAll => ProcessWindowFn => SideOutput-1 ^
>
> STAGE-2
> SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn => SideOutput-3 
> => SessionWindow2 => ProcessWindowFn => (SideOutput-4[WindowRecords], 
> KafkaSink[EvalResult])
>
> STAGE-3
> SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn => SideOutput-5 
> => SessionWindow3 => ProcessWindowFn => KafkaSink
>
> DESCRIPTION
>
> In STAGE-1, there are a fixed number of known keys so I will only see at most 
> about 21 distinct keys and therefore up to 21 tumbling one-minute windows. I 
> also need to aggregate all data in a global window to get an overall 
> non-keyed result. I need to bring the 21 results from those 21 tumbling 
> windows AND the one global result into one place where I can compare each of 
> the 21 windows results to the one global result. Based on this evaluation, 
> only some of the 21 windows results will survive that test. I want to then 
> take the data records from those, say 3 surviving windows, and make them the 
> "source" for STAGE-2 processing as well as publish some intermediate 
> evaluation results to a KafkaSink. STAGE-2 will reprocess the same data 
> records that the three STAGE-1 surviving windows processed, only keying them 
> by different dimensions. I expect there to be around 4000 fairly small 
> records per each of the 21 STAGE-1 windows so, in this example, I would be 
> sending 4000 x 3 = 12000 records in SideOutput-2 to form the new "source" 
> datastream for STAGE-2.
>
> Where I am struggling is:
>
> Trying to figure out how to best connect the output of the 21 STAGE-1 windows 
> and the one WIndowAll window records into a single point (I propose 
> SessionWindow1) to be able to compare each of the 21 windows data results 
> with the WindowAll non-keyed results.
> The best way to connect together these multiple stages.
>
> Looking at the STAGE-1 approach illustrated above, this is my attempt at an 
> approach using side outputs to:
>
> Form a new "source" data stream that contains the outputs of each of the 21 
> windows and the WindowAll data
> Consume that into a single session window
> Do the evaluations between the 21 keyed windows against the overall WindowAll 
> data
> Then emit only the 3 surviving sets of data from the 3 tumbling windows 
> outputs from the ProcessWindowFn to SideOutput-2 and the evaluation results 
> to Kafka
> Finally, SideOutput-2 will then form the new data stream "source" for STAGE-2 
> where a similar process will repeat, passing data to a STAGE-3, again similar 
> processing, to finally obtain the desired result that will be published to 
> Kafka.
>
> I would greatly appreciate the following:
>
> Comments on if this is a valid approach - am I on the right track here?
> Could you suggest an alternate approach that I could investigate if this is 
> problematic?.
>
> I am trying to build a Flink application that follows intended best practices 
> so I am just looking for some confirmation that I am heading down a 
> reasonable path for this design.
>
> Thank you in advance,
> Mark
>


Combining multiple stages into a multi-stage processing pipeline

2024-04-06 Thread Mark Petronic
I am looking for some design advice for a new Flink application and I am
relatively new to Flink - I have one, fairly straightforward Flink
application in production so far.

For this new application, I want to create a three-stage processing
pipeline. Functionally, I am seeing this as ONE long datastream. But, I
have to evaluate the STAGE-1 data in a special manner to then pass on that
evaluation to STAGE-2 where it will do its own special evaluation using the
STAGE-1 evaluation results to shape its evaluation. The same thing happens
again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
result is published to Kafka. The stages functionally look like this:

STAGE-1
KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
(SideOutput-2[WindowRecords], KafkaSink[EvalResult])
|=> WindowAll => ProcessWindowFn =>
SideOutput-1 ^

STAGE-2
SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
(SideOutput-4[WindowRecords], KafkaSink[EvalResult])

STAGE-3
SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink

DESCRIPTION

In STAGE-1, there are a fixed number of known keys so I will only see at
most about 21 distinct keys and therefore up to 21 tumbling one-minute
windows. I also need to aggregate all data in a global window to get
an overall non-keyed result. I need to bring the 21 results from those 21
tumbling windows AND the one global result into one place where I can
compare each of the 21 windows results to the one global result. Based on
this evaluation, only some of the 21 windows results will survive that
test. I want to then take the data records from those, say 3 surviving
windows, and make them the "source" for STAGE-2 processing as well as
publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
reprocess the same data records that the three STAGE-1 surviving windows
processed, only keying them by different dimensions. I expect there to be
around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
to form the new "source" datastream for STAGE-2.

Where I am struggling is:

   1. Trying to figure out how to best connect the output of the 21 STAGE-1
   windows and the one WIndowAll window records into a single point (I propose
   SessionWindow1) to be able to compare each of the 21 windows data results
   with the WindowAll non-keyed results.
   2. The best way to connect together these multiple stages.

Looking at the STAGE-1 approach illustrated above, this is my attempt at an
approach using side outputs to:

   1. Form a new "source" data stream that contains the outputs of each of
   the 21 windows and the WindowAll data
   2. Consume that into a single session window
   3. Do the evaluations between the 21 keyed windows against the overall
   WindowAll data
   4. Then emit only the 3 surviving sets of data from the 3 tumbling
   windows outputs from the ProcessWindowFn to SideOutput-2 and the
   evaluation results to Kafka
   5. Finally, SideOutput-2 will then form the new data stream "source" for
   STAGE-2 where a similar process will repeat, passing data to a STAGE-3,
   again similar processing, to finally obtain the desired result that will be
   published to Kafka.

I would greatly appreciate the following:

   1. Comments on if this is a valid approach - am I on the right track
   here?
   2. Could you suggest an alternate approach that I could investigate if
   this is problematic?.

I am trying to build a Flink application that follows intended best
practices so I am just looking for some confirmation that I am heading down
a reasonable path for this design.

Thank you in advance,
Mark