Unsubscribe
Re: Combining multiple stages into a multi-stage processing pipeline
Thank you Yunfeng. Your comments gave me some insights to explore how to use consecutive windows. So, I coded up a version that looks like this and works well for me: KafkaSource => Keyby => TumblingWindows => ProcessWindowFn => WindowAll => ProcessWindowFn => (Here I will repeated keyed and windowall in addition stages) The missing connection for me was not understanding that I could connect windows to windows in the same data stream. That understanding made all the difference. So the now the keyed tumbling windows for the 21 keys each process N records per key and create a score over that data and output a POJO containing the score and a List. Then the WindowAll gets those 21 POJOs of N records and iterates over all 21 * N records to calculate the overall score. Now that it has in hand the overall score and the 21 keyed scores from the prior windows, it can compare each of the 21 scores to the overall score and conditionally out.collect() only the List for the record sets below threshold. Then, subsequent stages can rinse and repeat this process in one clean job graph. Thanks again for you thoughts. They really helped light the light bulb for me :) Mark On Sat, Apr 6, 2024 at 11:24 PM Yunfeng Zhou wrote: > Hi Mark, > > IMHO, your design of the Flink application is generally feasible. In > Flink ML, I have once met a similar design in ChiSqTest operator, > where the input data is first aggregated to generate some results and > then broadcast and connected with other result streams from the same > input afterwards. You may refer to this algorithm for more details > when designing your applications. > > https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java > > Besides, side outputs are typically used when you want to split an > output stream into different categories. Given that the > ProcessWindowFn before each SideOutput-x only has one downstream, it > would be enough to directly pass the resulting DataStream to session > windows instead of introducing side outputs. > > Best, > Yunfeng > > On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic > wrote: > > > > I am looking for some design advice for a new Flink application and I am > relatively new to Flink - I have one, fairly straightforward Flink > application in production so far. > > > > For this new application, I want to create a three-stage processing > pipeline. Functionally, I am seeing this as ONE long datastream. But, I > have to evaluate the STAGE-1 data in a special manner to then pass on that > evaluation to STAGE-2 where it will do its own special evaluation using the > STAGE-1 evaluation results to shape its evaluation. The same thing happens > again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end > result is published to Kafka. The stages functionally look like this: > > > > STAGE-1 > > KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn => > SideOutput-1 |=> SessionWindow1 => ProcessWindowFn => > (SideOutput-2[WindowRecords], KafkaSink[EvalResult]) > > |=> WindowAll => ProcessWindowFn => > SideOutput-1 ^ > > > > STAGE-2 > > SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn => > SideOutput-3 => SessionWindow2 => ProcessWindowFn => > (SideOutput-4[WindowRecords], KafkaSink[EvalResult]) > > > > STAGE-3 > > SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn => > SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink > > > > DESCRIPTION > > > > In STAGE-1, there are a fixed number of known keys so I will only see at > most about 21 distinct keys and therefore up to 21 tumbling one-minute > windows. I also need to aggregate all data in a global window to get an > overall non-keyed result. I need to bring the 21 results from those 21 > tumbling windows AND the one global result into one place where I can > compare each of the 21 windows results to the one global result. Based on > this evaluation, only some of the 21 windows results will survive that > test. I want to then take the data records from those, say 3 surviving > windows, and make them the "source" for STAGE-2 processing as well as > publish some intermediate evaluation results to a KafkaSink. STAGE-2 will > reprocess the same data records that the three STAGE-1 surviving windows > processed, only keying them by different dimensions. I expect there to be > around 4000 fairly small records per each of the 21 STAGE-1 windows so, in > this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2 > to form the new "source" datastream for STAGE-2. > > >
Combining multiple stages into a multi-stage processing pipeline
I am looking for some design advice for a new Flink application and I am relatively new to Flink - I have one, fairly straightforward Flink application in production so far. For this new application, I want to create a three-stage processing pipeline. Functionally, I am seeing this as ONE long datastream. But, I have to evaluate the STAGE-1 data in a special manner to then pass on that evaluation to STAGE-2 where it will do its own special evaluation using the STAGE-1 evaluation results to shape its evaluation. The same thing happens again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end result is published to Kafka. The stages functionally look like this: STAGE-1 KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn => SideOutput-1 |=> SessionWindow1 => ProcessWindowFn => (SideOutput-2[WindowRecords], KafkaSink[EvalResult]) |=> WindowAll => ProcessWindowFn => SideOutput-1 ^ STAGE-2 SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn => SideOutput-3 => SessionWindow2 => ProcessWindowFn => (SideOutput-4[WindowRecords], KafkaSink[EvalResult]) STAGE-3 SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn => SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink DESCRIPTION In STAGE-1, there are a fixed number of known keys so I will only see at most about 21 distinct keys and therefore up to 21 tumbling one-minute windows. I also need to aggregate all data in a global window to get an overall non-keyed result. I need to bring the 21 results from those 21 tumbling windows AND the one global result into one place where I can compare each of the 21 windows results to the one global result. Based on this evaluation, only some of the 21 windows results will survive that test. I want to then take the data records from those, say 3 surviving windows, and make them the "source" for STAGE-2 processing as well as publish some intermediate evaluation results to a KafkaSink. STAGE-2 will reprocess the same data records that the three STAGE-1 surviving windows processed, only keying them by different dimensions. I expect there to be around 4000 fairly small records per each of the 21 STAGE-1 windows so, in this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2 to form the new "source" datastream for STAGE-2. Where I am struggling is: 1. Trying to figure out how to best connect the output of the 21 STAGE-1 windows and the one WIndowAll window records into a single point (I propose SessionWindow1) to be able to compare each of the 21 windows data results with the WindowAll non-keyed results. 2. The best way to connect together these multiple stages. Looking at the STAGE-1 approach illustrated above, this is my attempt at an approach using side outputs to: 1. Form a new "source" data stream that contains the outputs of each of the 21 windows and the WindowAll data 2. Consume that into a single session window 3. Do the evaluations between the 21 keyed windows against the overall WindowAll data 4. Then emit only the 3 surviving sets of data from the 3 tumbling windows outputs from the ProcessWindowFn to SideOutput-2 and the evaluation results to Kafka 5. Finally, SideOutput-2 will then form the new data stream "source" for STAGE-2 where a similar process will repeat, passing data to a STAGE-3, again similar processing, to finally obtain the desired result that will be published to Kafka. I would greatly appreciate the following: 1. Comments on if this is a valid approach - am I on the right track here? 2. Could you suggest an alternate approach that I could investigate if this is problematic?. I am trying to build a Flink application that follows intended best practices so I am just looking for some confirmation that I am heading down a reasonable path for this design. Thank you in advance, Mark
Guidance on general design approach for Flink
I am working on a new application to perform real time anomaly detection using Flink. I am relatively new to Flink but have already one application in production that is fairly basic and of low throughput. This next one will be more complex and much higher throughput. My query is about handling late arriving data. For this application, the source data is like this: - zip files containing a single JSON file each are pushed to an S3 bucket by many servers that I need to monitor where N servers are in M pools - An SQS event then is published from S3 for each new zip file that arrives - I write a Flink source to read the event and pull the S3 file as it arrives, stream unzip, deserialize the JSON, flat map its contents into a datastream, and then process the data in tumbling 60 second windows Each file can contain up to either 300 seconds worth of metrics or 1000 time series records. When a server is processing a lot of connections, the files grow faster so the JSON file is closed as soon at the1000 sample threshold hits. When a server's traffic is low, it emits the file when the 300 second elapsed time threshold hits, regardless of how many samples are in the file (so the file will have between 0 <= samples <= 1000) in this case. It is this pattern that I am struggling with. I need to use one-minute tumbling windows to aggregate these metrics. However, I may have to wait 300 seconds for the slow traffic file to be uploaded to S3 while other files (on higher traffic servers) are showing up in S3 maybe every 10 or 20 seconds (each filled with 1000 samples that trigger the file closure and update). Any of these files can contain a portion of the data that would align into the same 60 second time window. All of the data from all these servers needs to be aggregated and grouped as the servers are part of pools of servers so I need to group by the records from each POOL and not by just the records from each server. So, my questions given that context are: 1. Is having a 60 second tumbling window with 300 seconds of allowed lateness a pattern that is fairly common and typically implemented in Flink - where the lateness is not a fraction of the window size but a multiple of it? My sense is that this IS a reasonable problem that Flink can deal with. 2. I believe with such an approach, there will potentially be 5-6 60-second windows in flight for each grouping key to accommodate such allowed lateness and that means a lot more resources/state for the cluster to support. This impacts the resources needed for a given cluster scale. Do I have that assumption correct? I just want to make sure I am considering the right tool for this job and appreciate any inputs on this.
Best practice way to conditionally discard a window and not serialize the results
I am reading stats from Kinesis, deserializing them into a stat POJO and then doing something like this using an aggregated window with no defined processWindow function: timestampedStats .keyBy(v -> v.groupKey()) .window(TumblingEventTimeWindows.of(Time.seconds(appCfg.getWindowSize( .aggregate(new ImpactAggregator(appCfg.getSmoothingRange(), appCfg.getThreshold())) .sinkTo(getKinesisProducer(appCfg.getAcuWindowsStreamName(), appCfg.getAwsRegion())) .name("Kinesis Producer"); As part of the aggregation logic, I am looking for certain threshold violations where some field in each metric is tested against some fixed threshold. I then increment a counter in an accumulator for each stat violation for the duration of the window (300 seconds) along with some other metadata associated with that stat that violated the threshold. If there are violations, then I want to process the window by serializing its contents to JSON and publishing to Kinesis. What I want to do is NOT serialize a window that has NO violations in its accumulator. There is no need to send a message when no bad conditions are observed. - Could you please tell me how I can just throw away a window and NOT serialize it when it is ready to be processed? - How do I hook into some event that allows me to do that? - Do I need to implement a ProcessKeyedWindowFunction along with my AggregateFunction and somehow handle this as part of the process window function? I have created a class that implements SerializationSchema to do that serialization but the serialize() function requires a valid JSON returned byte[]. I think the solution is somewhere else where I can elect to NOT process the window at all and thereby serialize() will NOT get called. Thank you
Checkpoint/Savepoint restore of S3 file reads using continuous read mode
I am trying to understand the Flink design pattern for consuming files from S3 continuously as they appear. I have written the below minimal program to do that and it works as expected wrt detecting newly-uploaded S3 files within the configured 5 second monitoring poll period. Then it just prints the file contents to stdout as a simple task. Where I am having difficulty is understanding the design pattern to maintain state so that upon restart, the Flink app will NOT reprocess files that it already processed. I can see that Flink is retaining state in my configured state location at file:///var/tmp/flink/checkpoints/ and when inspecting state files at paths like /var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata, I see that the S3 path to the files already processed by Flink show up in the _metadata file. So, I know the state of each file is being captured. Now I am trying to understand a few concepts; 1. How much state will Flink retain? If the files in the bucket are retained for a long time then there could be a lot of files piling up in the bucket with say, a life cycle delete policy of 30 days. It seems that Flink would have to retain the complete list to be able to avoid reprocessing existing files and that would be quite a lot of state. 2. I understand from the docs that you can restart Flink using state from either a savepoint or a checkpoint. I was trying to restart my test application standalone using the following command from my dev environment but, upon startup, it still reprocesses the files that are in the _metadata state captured from the previous run. Is the "--fromSavepoint" option the correct way to specify the savepoint file to be read at startup? /usr/bin/env \ ASP_USE_LOCAL_ENV=1 \ ASP_VERBOSE_LOGGING=true \ ASP_CHECKPOINT_INTERVAL_MILLISECONDS=5000 \ /usr/lib/jvm/java-11-openjdk-amd64/bin/java \ @./startup.argfile \ com.myapp.weatherstreamprocessor.WeatherStreamProcessor \ --fromSavepoint /var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata I am using the Flink operator to deploy my Flink application to EKS and already have one production Flink application that is consuming from and writing to Kinesis, so I have some initial Flink experience doing that. So, I realize that, when deployed in my EKS cluster, checkpointing is meant for recovery of the task managers by the job manager should the task managers need to be restarted. And, for managed restarts (like code updates), I should be using an explicitly created savepoint. But I am just trying to prove the behavior in my test environment. Could someone kindly direct me to the right approach to be able to restart in my test environment, read the checkpoint, and NOT have Flink reprocesses the files already seen in the previous running instance? Thanks! # == # My Test Application # == package com.myapp.weatherstreamprocessor; import java.time.Duration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WeatherStreamProcessor { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true); conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024 ^ 3)); conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024 ^ 3)); conf.set(TaskManagerOptions.CPU_CORES, 4.0); final StreamExecutionEnvironment env; Config appCfg = new Config(); if (appCfg.getUseLocalEnv()) { env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); } else { env = StreamExecutionEnvironment.getExecutionEnvironment(conf); } env.setParallelism(appCfg.getParallelism()); if (appCfg.getCheckpointInterval() > 0) { env.enableCheckpointing(appCfg.getCheckpointInterval()); } CheckpointConfig config = env.getCheckpointConfig(); config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Unsubscribe
Web UI not working with createLocalEnvironmentWithWebUI()
I am learning Flink for a new project. I am trying to understand the development/debug environment to help me step through my code to better learn Flink. I am using the Intellij community edition for my IDE and Flink 1.17.0. I am using this simple Flink app to demonstrate my issue. //=== package streamwindowprocessor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SimpleFlink { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); //final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream values = env.socketTextStream("localhost", ); values.print(); env.execute("Alarm Stream Processor"); } } //=== Before I run this from the IDE, I start up a socket listener on a terminal: nc -lkp Then I open a web browser to localhost:8081 and get this output { - errors: [ - "Not found: /" ] } If instead, I use ./start-cluster.sh to start a standalone cluster, rebuild my jar using getExecutionEnvironment() and submit that same simple jar using ./flink.sh run , then I can open the browser to localhost:8081 and I do see my app running, as expected, and it processes strings I send via the running netcat. Someone in SO noted that you should add this dependency, which I did and it made no difference. org.apache.flink flink-runtime-web ${flink.version} *Can anyone help me understand why the web UI does not work in createLocalEnvironmentWithWebUI()?* Thanks, Mark