Unsubscribe

2024-05-12 Thread Mark Petronic



Re: Combining multiple stages into a multi-stage processing pipeline

2024-04-07 Thread Mark Petronic
Thank you Yunfeng. Your comments gave me some insights to explore how to
use consecutive windows. So, I coded up a version that looks like this and
works well for me:

KafkaSource => Keyby => TumblingWindows => ProcessWindowFn => WindowAll =>
ProcessWindowFn => (Here I will repeated keyed and windowall in addition
stages)

The missing connection for me was not understanding that I could connect
windows to windows in the same data stream. That understanding made all the
difference. So the now the keyed tumbling windows for the 21 keys each
process N records per key and create a score over that data and output a
POJO containing the score and a List. Then the WindowAll gets
those 21 POJOs of N records and iterates over all 21 * N records to
calculate the overall score. Now that it has in hand the overall score and
the 21 keyed scores from the prior windows, it can compare each of the 21
scores to the overall score and conditionally out.collect() only the
List for the record sets below threshold. Then, subsequent stages
can rinse and repeat this process in one clean job graph.

Thanks again for you thoughts. They really helped light the light bulb for
me :)
Mark


On Sat, Apr 6, 2024 at 11:24 PM Yunfeng Zhou 
wrote:

> Hi Mark,
>
> IMHO, your design of the Flink application is generally feasible. In
> Flink ML, I have once met a similar design in ChiSqTest operator,
> where the input data is first aggregated to generate some results and
> then broadcast and connected with other result streams from the same
> input afterwards. You may refer to this algorithm for more details
> when designing your applications.
>
> https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java
>
> Besides, side outputs are typically used when you want to split an
> output stream into different categories. Given that the
> ProcessWindowFn before each SideOutput-x only has one downstream, it
> would be enough to directly pass the resulting DataStream to session
> windows instead of introducing side outputs.
>
> Best,
> Yunfeng
>
> On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic 
> wrote:
> >
> > I am looking for some design advice for a new Flink application and I am
> relatively new to Flink - I have one, fairly straightforward Flink
> application in production so far.
> >
> > For this new application, I want to create a three-stage processing
> pipeline. Functionally, I am seeing this as ONE long datastream. But, I
> have to evaluate the STAGE-1 data in a special manner to then pass on that
> evaluation to STAGE-2 where it will do its own special evaluation using the
> STAGE-1 evaluation results to shape its evaluation. The same thing happens
> again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
> result is published to Kafka. The stages functionally look like this:
> >
> > STAGE-1
> > KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
> SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
> (SideOutput-2[WindowRecords], KafkaSink[EvalResult])
> > |=> WindowAll => ProcessWindowFn =>
> SideOutput-1 ^
> >
> > STAGE-2
> > SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
> SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
> (SideOutput-4[WindowRecords], KafkaSink[EvalResult])
> >
> > STAGE-3
> > SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
> SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink
> >
> > DESCRIPTION
> >
> > In STAGE-1, there are a fixed number of known keys so I will only see at
> most about 21 distinct keys and therefore up to 21 tumbling one-minute
> windows. I also need to aggregate all data in a global window to get an
> overall non-keyed result. I need to bring the 21 results from those 21
> tumbling windows AND the one global result into one place where I can
> compare each of the 21 windows results to the one global result. Based on
> this evaluation, only some of the 21 windows results will survive that
> test. I want to then take the data records from those, say 3 surviving
> windows, and make them the "source" for STAGE-2 processing as well as
> publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
> reprocess the same data records that the three STAGE-1 surviving windows
> processed, only keying them by different dimensions. I expect there to be
> around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
> this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
> to form the new "source" datastream for STAGE-2.
> >
>

Combining multiple stages into a multi-stage processing pipeline

2024-04-06 Thread Mark Petronic
I am looking for some design advice for a new Flink application and I am
relatively new to Flink - I have one, fairly straightforward Flink
application in production so far.

For this new application, I want to create a three-stage processing
pipeline. Functionally, I am seeing this as ONE long datastream. But, I
have to evaluate the STAGE-1 data in a special manner to then pass on that
evaluation to STAGE-2 where it will do its own special evaluation using the
STAGE-1 evaluation results to shape its evaluation. The same thing happens
again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
result is published to Kafka. The stages functionally look like this:

STAGE-1
KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
(SideOutput-2[WindowRecords], KafkaSink[EvalResult])
|=> WindowAll => ProcessWindowFn =>
SideOutput-1 ^

STAGE-2
SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
(SideOutput-4[WindowRecords], KafkaSink[EvalResult])

STAGE-3
SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink

DESCRIPTION

In STAGE-1, there are a fixed number of known keys so I will only see at
most about 21 distinct keys and therefore up to 21 tumbling one-minute
windows. I also need to aggregate all data in a global window to get
an overall non-keyed result. I need to bring the 21 results from those 21
tumbling windows AND the one global result into one place where I can
compare each of the 21 windows results to the one global result. Based on
this evaluation, only some of the 21 windows results will survive that
test. I want to then take the data records from those, say 3 surviving
windows, and make them the "source" for STAGE-2 processing as well as
publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
reprocess the same data records that the three STAGE-1 surviving windows
processed, only keying them by different dimensions. I expect there to be
around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
to form the new "source" datastream for STAGE-2.

Where I am struggling is:

   1. Trying to figure out how to best connect the output of the 21 STAGE-1
   windows and the one WIndowAll window records into a single point (I propose
   SessionWindow1) to be able to compare each of the 21 windows data results
   with the WindowAll non-keyed results.
   2. The best way to connect together these multiple stages.

Looking at the STAGE-1 approach illustrated above, this is my attempt at an
approach using side outputs to:

   1. Form a new "source" data stream that contains the outputs of each of
   the 21 windows and the WindowAll data
   2. Consume that into a single session window
   3. Do the evaluations between the 21 keyed windows against the overall
   WindowAll data
   4. Then emit only the 3 surviving sets of data from the 3 tumbling
   windows outputs from the ProcessWindowFn to SideOutput-2 and the
   evaluation results to Kafka
   5. Finally, SideOutput-2 will then form the new data stream "source" for
   STAGE-2 where a similar process will repeat, passing data to a STAGE-3,
   again similar processing, to finally obtain the desired result that will be
   published to Kafka.

I would greatly appreciate the following:

   1. Comments on if this is a valid approach - am I on the right track
   here?
   2. Could you suggest an alternate approach that I could investigate if
   this is problematic?.

I am trying to build a Flink application that follows intended best
practices so I am just looking for some confirmation that I am heading down
a reasonable path for this design.

Thank you in advance,
Mark


Guidance on general design approach for Flink

2024-01-30 Thread Mark Petronic
I am working on a new application to perform real time anomaly detection
using Flink. I am relatively new to Flink but have already one application
in production that is fairly basic and of low throughput. This next one
will be more complex and much higher throughput.

My query is about handling late arriving data. For this application, the
source data is like this:

   - zip files containing a single JSON file each are pushed to an S3
   bucket by many servers that I need to monitor where N servers are in M pools
   - An SQS event then is published from S3 for each new zip file that
   arrives
   - I write a Flink source to read the event and pull the S3 file as it
   arrives, stream unzip, deserialize the JSON, flat map its contents into a
   datastream, and then process the data in tumbling 60 second windows

Each file can contain up to either 300 seconds worth of metrics or 1000
time series records. When a server is processing a lot of connections, the
files grow faster so the JSON file is closed as soon at the1000
sample threshold hits. When a server's traffic is low, it emits the file
when the 300 second elapsed time threshold hits, regardless of how many
samples are in the file (so the file will have between 0 <= samples <=
1000) in this case.

It is this pattern that I am struggling with. I need to use one-minute
tumbling windows to aggregate these metrics. However, I may have to wait
300 seconds for the slow traffic file to be uploaded to S3 while other
files (on higher traffic servers) are showing up in S3 maybe every 10 or 20
seconds (each filled with 1000 samples that trigger the file closure and
update). Any of these files can contain a portion of the data that
would align into the same 60 second time window. All of the data from all
these servers needs to be aggregated and grouped as the servers are part of
pools of servers so I need to group by the records from each POOL and not
by just the records from each server.

So, my questions given that context are:

   1. Is having a 60 second tumbling window with 300 seconds of allowed
   lateness a pattern that is fairly common and typically implemented in Flink
   - where the lateness is not a fraction of the window size but a multiple of
   it? My sense is that this IS a reasonable problem that Flink can deal with.
   2. I believe with such an approach, there will potentially be 5-6
   60-second windows in flight for each grouping key to accommodate such
   allowed lateness and that means a lot more resources/state for the cluster
   to support. This impacts the resources needed for a given cluster scale. Do
   I have that assumption correct?

I just want to make sure I am considering the right tool for this job and
appreciate any inputs on this.


Best practice way to conditionally discard a window and not serialize the results

2023-10-30 Thread Mark Petronic
I am reading stats from Kinesis, deserializing them into a stat POJO and
then doing something like this using an aggregated window with no defined
processWindow function:

timestampedStats
.keyBy(v -> v.groupKey())

.window(TumblingEventTimeWindows.of(Time.seconds(appCfg.getWindowSize(
.aggregate(new ImpactAggregator(appCfg.getSmoothingRange(),
appCfg.getThreshold()))

.sinkTo(getKinesisProducer(appCfg.getAcuWindowsStreamName(),
appCfg.getAwsRegion()))
.name("Kinesis Producer");

As part of the aggregation logic, I am looking for certain threshold
violations where some field in each metric is tested against some fixed
threshold. I then increment a counter in an accumulator for each stat
violation for the duration of the window (300 seconds) along with some
other metadata associated with that stat that violated the threshold. If
there are violations, then I want to process the window by serializing its
contents to JSON and publishing to Kinesis. What I want to do is NOT
serialize a window that has NO violations in its accumulator. There is no
need to send a message when no bad conditions are observed.

   - Could you please tell me how I can just throw away a window and NOT
   serialize it when it is ready to be processed?
   - How do I hook into some event that allows me to do that?
   - Do I need to implement a ProcessKeyedWindowFunction along with my
   AggregateFunction and somehow handle this as part of the process window
   function?

I have created a class that implements SerializationSchema to do that
serialization but the serialize() function requires a valid JSON returned
byte[]. I think the solution is somewhere else where I can elect to NOT
process the window at all and thereby serialize() will NOT get called.

Thank you


Checkpoint/Savepoint restore of S3 file reads using continuous read mode

2023-10-06 Thread Mark Petronic
I am trying to understand the Flink design pattern for consuming files from
S3 continuously as they appear. I have written the below minimal program to
do that and it works as expected wrt detecting newly-uploaded S3 files
within the configured 5 second monitoring poll period. Then it just prints
the file contents to stdout as a simple task.

Where I am having difficulty is understanding the design pattern to
maintain state so that upon restart, the Flink app will NOT reprocess files
that it already processed.

I can see that Flink is retaining state in my configured state location at
file:///var/tmp/flink/checkpoints/ and when inspecting state files at paths
like
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata,
I see that the S3 path to the files already processed by Flink show up in
the _metadata file. So, I know the state of each file is being captured.

Now I am trying to understand a few concepts;

   1. How much state will Flink retain? If the files in the bucket are
   retained for a long time then there could be a lot of files piling up in
   the bucket with say, a life cycle delete policy of 30 days. It seems that
   Flink would have to retain the complete list to be able to avoid
   reprocessing existing files and that would be quite a lot of state.
   2. I understand from the docs that you can restart Flink using state
   from either a savepoint or a checkpoint. I was trying to restart my test
   application standalone using the following command from my dev environment
   but, upon startup, it still reprocesses the files that are in the _metadata
   state captured from the previous run. Is the "--fromSavepoint" option
   the correct way to specify the savepoint file to be read at startup?

/usr/bin/env \
ASP_USE_LOCAL_ENV=1 \
ASP_VERBOSE_LOGGING=true \
ASP_CHECKPOINT_INTERVAL_MILLISECONDS=5000 \
/usr/lib/jvm/java-11-openjdk-amd64/bin/java \
@./startup.argfile \
com.myapp.weatherstreamprocessor.WeatherStreamProcessor \
--fromSavepoint
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata

I am using the Flink operator to deploy my Flink application to EKS and
already have one production Flink application that is consuming from and
writing to Kinesis, so I have some initial Flink experience doing that. So,
I realize that, when deployed in my EKS cluster, checkpointing is meant for
recovery of the task managers by the job manager should the task managers
need to be restarted. And, for managed restarts (like code updates), I
should be using an explicitly created savepoint. But I am just trying to
prove the behavior in my test environment.

Could someone kindly direct me to the right approach to be able to restart
in my test environment, read the checkpoint, and NOT have Flink reprocesses
the files already seen in the previous running instance?

Thanks!




#
==
# My Test Application
#
==

package com.myapp.weatherstreamprocessor;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WeatherStreamProcessor {
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
conf.set(TaskManagerOptions.CPU_CORES, 4.0);

final StreamExecutionEnvironment env;
Config appCfg = new Config();

if (appCfg.getUseLocalEnv()) {
env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
}

env.setParallelism(appCfg.getParallelism());
if (appCfg.getCheckpointInterval() > 0) {
env.enableCheckpointing(appCfg.getCheckpointInterval());
}
CheckpointConfig config = env.getCheckpointConfig();

config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Unsubscribe

2023-07-04 Thread Mark Petronic



Web UI not working with createLocalEnvironmentWithWebUI()

2023-04-16 Thread Mark Petronic
I am learning Flink for a new project. I am trying to understand the
development/debug environment to help me step through my code to better
learn Flink. I am using the Intellij community edition for my IDE and Flink
1.17.0.

I am using this simple Flink app to demonstrate my issue.

//===
package streamwindowprocessor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SimpleFlink {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream values = env.socketTextStream("localhost", );
values.print();

env.execute("Alarm Stream Processor");
}
}
//===

Before I run this from the IDE, I start up a socket listener on a terminal:

nc -lkp 

Then I open a web browser to localhost:8081 and get this output

{

   - errors:
   [
  - "Not found: /"
  ]

}

If instead, I use ./start-cluster.sh to start a standalone cluster, rebuild
my jar using getExecutionEnvironment() and submit that same simple jar
using ./flink.sh run , then I can open the browser to
localhost:8081 and I do see my app running, as expected, and it processes
strings I send via the running netcat.

Someone in SO noted that you should add this dependency, which I did and it
made no difference.


  org.apache.flink
  flink-runtime-web
  ${flink.version}


*Can anyone help me understand why the web UI does not work
in createLocalEnvironmentWithWebUI()?*

Thanks,
Mark