Re: Never gets into ProcessWindowFunction.process()
Hi Gary, Bang on the money. I did not have an assigned Watermark and once I put that in, the code entered the process() method. Thx a ton for your help.Life-saver DataStream kinesisStream = env .addSource(kinesisConsumer) .assignTimestampsAndWatermarks(new MonitoringAssigner())//<= On Fri, Nov 9, 2018 at 10:02 AM Gary Yao wrote: > Hi, > > You are using event time but are you assigning watermarks [1]? I do not > see it > in the code. > > Best, > Gary > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records > > On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan > wrote: > >> Hi, >> Any help is appreciated.Dug into this. *I can see the deserialized >> output log from FlinkKinesisConsumer deserialization but it keeps looping >> to pull from Kinesis Stream but never gets into the Windowing operation for >> process() or apply().* >> >> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream >> and the deserialized output never seems to get into the apply() or >> process() method of a Windowing operation. I can see the logs of >> MonitoringMapKinesisSchema deserializing data back successfully from >> Kinesis and converting into a POJO. >> >> Code: >> >> *//Create environment*: >> StreamExecutionEnvironment env; >> if (local) { >> Configuration configuration = new Configuration(); >> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); >> env = StreamExecutionEnvironment.createLocalEnvironment(1, >> configuration); >> } else { >> env = StreamExecutionEnvironment.getExecutionEnvironment(); >> } >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> *//create FlinkKinesisConsumer* >> Properties kinesisConsumerConfig = new Properties(); >> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, >> "AUTO"); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, >> "1"); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, >> "2000"); >> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> "TRIM_HORIZON"); >> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( >> kinesisTopicRead, new MonitoringMapKinesisSchema(), >> kinesisConsumerConfig);*//deserialization works fine* >> DataStream kinesisStream = env >> .addSource(kinesisConsumer); >> KeyedStream> >> enrichedComponentInstanceStream1Key = kinesisStream >> .keyBy(new KeySelector> String>>() { >> public Tuple3 >> getKey(Monitoring mon) throws Exception { >> return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); >> } >> }); >> >> WindowedStream, TimeWindow> >> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key >> >> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); >> >> DataStream enrichedComponentInstanceStream1 = >> enrichedComponentInstanceStream1Win >> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, >> COMPONENT_INSTANCE_OPERATION)) >> .process(new Window5SecProcessing());*//never gets in >> here* >> //Gets into Window5SecProcessing.open() method during initialization but >> never into the process method >> private static class Window5SecProcessing extends >> ProcessWindowFunction> String, String>, TimeWindow> { >> >> private transient String interval; >> private transient String gameId; >> private transient String keyType; >> private transient org.apache.flink.metrics.Histogram >> fiveSecHistogram; >> >> private transient ValueState total5SecCountState; >> private transient ValueStateDescriptor >> total5SecCountValueStateDescriptor; >> public Window5SecProcessing() { >> >> } >> >> public Window5SecProcessing(String gameId, String interval, >> String keyType) { >> this.gameId = gameId; >> this.interval = interval; >> this.keyType = keyType; >> } >> >> @Override >> public void clear(Context context) throws Exception { >> super.clear(context); >> KeyedStateStore keyedStateStore = context.windowState(); >> >> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> logger.debug("Gets in here fine -Window5SecProcessing >> -Entered open - parameters:{}", parameters); >> com.codahale.metrics.Histogram fiveSecHist = >> new com.codahale.metrics.Histogram(new >> SlidingTimeWindowReservoir(5,
Re: Never gets into ProcessWindowFunction.process()
Hi, You are using event time but are you assigning watermarks [1]? I do not see it in the code. Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan wrote: > Hi, > Any help is appreciated.Dug into this. *I can see the deserialized output > log from FlinkKinesisConsumer deserialization but it keeps looping to pull > from Kinesis Stream but never gets into the Windowing operation for > process() or apply().* > > FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream > and the deserialized output never seems to get into the apply() or > process() method of a Windowing operation. I can see the logs of > MonitoringMapKinesisSchema deserializing data back successfully from > Kinesis and converting into a POJO. > > Code: > > *//Create environment*: > StreamExecutionEnvironment env; > if (local) { > Configuration configuration = new Configuration(); > configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); > env = StreamExecutionEnvironment.createLocalEnvironment(1, > configuration); > } else { > env = StreamExecutionEnvironment.getExecutionEnvironment(); > } > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > *//create FlinkKinesisConsumer* > Properties kinesisConsumerConfig = new Properties(); > kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, > "AUTO"); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, > "1"); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, > "2000"); > kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON"); > FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( > kinesisTopicRead, new MonitoringMapKinesisSchema(), > kinesisConsumerConfig);*//deserialization works fine* > DataStream kinesisStream = env > .addSource(kinesisConsumer); > KeyedStream> > enrichedComponentInstanceStream1Key = kinesisStream > .keyBy(new KeySelector String>>() { > public Tuple3 > getKey(Monitoring mon) throws Exception { > return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); > } > }); > > WindowedStream, TimeWindow> > enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key > > .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); > > DataStream enrichedComponentInstanceStream1 = > enrichedComponentInstanceStream1Win > //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, > COMPONENT_INSTANCE_OPERATION)) > .process(new Window5SecProcessing());*//never gets in > here* > //Gets into Window5SecProcessing.open() method during initialization but > never into the process method > private static class Window5SecProcessing extends > ProcessWindowFunction String, String>, TimeWindow> { > > private transient String interval; > private transient String gameId; > private transient String keyType; > private transient org.apache.flink.metrics.Histogram > fiveSecHistogram; > > private transient ValueState total5SecCountState; > private transient ValueStateDescriptor > total5SecCountValueStateDescriptor; > public Window5SecProcessing() { > > } > > public Window5SecProcessing(String gameId, String interval, String > keyType) { > this.gameId = gameId; > this.interval = interval; > this.keyType = keyType; > } > > @Override > public void clear(Context context) throws Exception { > super.clear(context); > KeyedStateStore keyedStateStore = context.windowState(); > > keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > logger.debug("Gets in here fine -Window5SecProcessing -Entered > open - parameters:{}", parameters); > com.codahale.metrics.Histogram fiveSecHist = > new com.codahale.metrics.Histogram(new > SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); > this.fiveSecHistogram = new > DropwizardHistogramWrapper(fiveSecHist); > total5SecCountValueStateDescriptor = > new ValueStateDescriptor("total5SecCount", > Long.class, 0L); > total5SecCountState = > getRuntimeContext().getState(total5SecCountValueStateDescriptor); > } > > > public void process(Tuple3 currentKey1, > Context ctx, Iterable input, Collector out) > throws Exception { > logger.d
Re: Never gets into ProcessWindowFunction.process()
Hi, Any help is appreciated.Dug into this. *I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().* FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO. Code: *//Create environment*: StreamExecutionEnvironment env; if (local) { Configuration configuration = new Configuration(); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration); } else { env = StreamExecutionEnvironment.getExecutionEnvironment(); } env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); *//create FlinkKinesisConsumer* Properties kinesisConsumerConfig = new Properties(); kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000"); kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);*//deserialization works fine* DataStream kinesisStream = env .addSource(kinesisConsumer); KeyedStream> enrichedComponentInstanceStream1Key = kinesisStream .keyBy(new KeySelector>() { public Tuple3 getKey(Monitoring mon) throws Exception { return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation()); } }); WindowedStream, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); DataStream enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION)) .process(new Window5SecProcessing());*//never gets in here* //Gets into Window5SecProcessing.open() method during initialization but never into the process method private static class Window5SecProcessing extends ProcessWindowFunction, TimeWindow> { private transient String interval; private transient String gameId; private transient String keyType; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient ValueState total5SecCountState; private transient ValueStateDescriptor total5SecCountValueStateDescriptor; public Window5SecProcessing() { } public Window5SecProcessing(String gameId, String interval, String keyType) { this.gameId = gameId; this.interval = interval; this.keyType = keyType; } @Override public void clear(Context context) throws Exception { super.clear(context); KeyedStateStore keyedStateStore = context.windowState(); keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters); com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } public void process(Tuple3 currentKey1, Context ctx, Iterable input, Collector out) throws Exception { logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");// ... } On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan wrote: > Hi, > Running in IntelliJ IDE on a Mac with 4 vProcessors. > Code compiles fine. It never gets into the Window5SecProcessing's > process().I am able to get data from the Kinesis Consumer and it is > deserialized properly when I debug the code. It gets into the > Window5SecProcessing.open() method for initialization. > > Not sure if I am failing with no slots available ??? > In main(): > ...
Re: Never gets into ProcessWindowFunction.process()
Hi Gary, Just posted the code.Pls let me know if that clarifies the problem. Have been digging into how the FlinkKinesisConsumer deserialized output gets passed into the process() or apply() method to no avail. The coding pattern I used matches all the fink-examples I have seen for Flink 1.6.1 TIA, Vijay On Fri, Nov 9, 2018 at 9:53 AM Gary Yao wrote: > Hi, > > If the job is actually running and consuming from Kinesis, the log you > posted > is unrelated to your problem. To understand why the process function is not > invoked, we would need to see more of your code, or you would need to > provide > an executable example. The log only shows that all offered slots are > occupied > by tasks of your job. > > Best, > Gary > > On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan > wrote: > >> Hi, >> Running in IntelliJ IDE on a Mac with 4 vProcessors. >> Code compiles fine. It never gets into the Window5SecProcessing's >> process().I am able to get data from the Kinesis Consumer and it is >> deserialized properly when I debug the code. It gets into the >> Window5SecProcessing.open() method for initialization. >> >> Not sure if I am failing with no slots available ??? >> In main(): >> //trimmed a lot of code >> *FlinkKinesisConsumer kinesisConsumer = >> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., >> ...);* >> >> *DataStream kinesisStream = env* >> *.addSource(kinesisConsumer)* >> *.uid(jobName + "KinesisSource");* >> *KeyedStream> >> enrichedComponentInstanceStream1Key = kinesisStream* >> *.keyBy(new KeySelector> String, String>>() {* >> *public Tuple3 >> getKey(Monitoring mon) throws Exception {* >> *return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* >> *}});* >> >> *WindowedStream, >> TimeWindow> enrichedComponentInstanceStream1Win = >> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* >> >> *DataStream enrichedComponentInstanceStream1 >> = enrichedComponentInstanceStream1Win* >> *.process(new Window5SecProcessing(gameId, FIVE_SECONDS, >> COMPONENT_INSTANCE_OPERATION))* >> *.uid("Component Instance Operation Key Monitoring " + >> FIVE_SECONDS);* >> *enrichedComponentInstanceStream1.addSink(new >> SinkFunction() {* >> *@Override* >> *public void invoke(MonitoringGrouping mg, Context context) >> throws Exception {* >> *//TODO call ES* >> *logger.debug("In enrichedComponentInstanceStream1 Sink >> received mg:{}", mg);* >> *}* >> *});* >> *Window processing class*: >> private static class Window5SecProcessing extends >> ProcessWindowFunction> String, String>, TimeWindow> { >> private transient Histogram fiveSecHist; >> private transient Histogram fiveMinHist; >> private transient org.apache.flink.metrics.Histogram >> fiveSecHistogram; >> private transient org.apache.flink.metrics.Histogram >> fiveMinHistogram; >> private transient ValueState total5SecCountState; >> private transient ValueStateDescriptor >> total5SecCountValueStateDescriptor; >> >> public Window5SecProcessing(String gameId, String interval, >> String keyType) { >> ... >> } >> >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> logger.debug("Window5SecProcessing -Entered open - >> parameters:{}", parameters);//gets here >> com.codahale.metrics.Histogram fiveSecHist = >> new com.codahale.metrics.Histogram(new >> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); >> this.fiveSecHistogram = new >> DropwizardHistogramWrapper(fiveSecHist); >> total5SecCountValueStateDescriptor = >> new ValueStateDescriptor("total5SecCount", >> Long.class, 0L); >> total5SecCountState = >> getRuntimeContext().getState(total5SecCountValueStateDescriptor); >> } >> .. >> >>* public void process(Tuple3 currentKey1, >> Context ctx, Iterable input, Collector out) >> throws Exception {* >> *logger.debug("Window5SecProcessing - Entered process >> ");//never gets here* >> *Tuple3 currentKey = (Tuple3> String, String>) currentKey1;* >> ** >> *}* >> >> } >> At 1 point in the logs, I seem to see that there are no slots available >> ? Is that the problem- how can I fix that if that is the case to test >> locally on my Mac ?? >> *Log:* >> flink-akka.actor.default-dispatcher-71 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool >> Status: >> status: connected to >> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc >> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8d
Re: Never gets into ProcessWindowFunction.process()
Hi, If the job is actually running and consuming from Kinesis, the log you posted is unrelated to your problem. To understand why the process function is not invoked, we would need to see more of your code, or you would need to provide an executable example. The log only shows that all offered slots are occupied by tasks of your job. Best, Gary On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan wrote: > Hi, > Running in IntelliJ IDE on a Mac with 4 vProcessors. > Code compiles fine. It never gets into the Window5SecProcessing's > process().I am able to get data from the Kinesis Consumer and it is > deserialized properly when I debug the code. It gets into the > Window5SecProcessing.open() method for initialization. > > Not sure if I am failing with no slots available ??? > In main(): > //trimmed a lot of code > *FlinkKinesisConsumer kinesisConsumer = > getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., > ...);* > > *DataStream kinesisStream = env* > *.addSource(kinesisConsumer)* > *.uid(jobName + "KinesisSource");* > *KeyedStream> > enrichedComponentInstanceStream1Key = kinesisStream* > *.keyBy(new KeySelector String>>() {* > *public Tuple3 > getKey(Monitoring mon) throws Exception {* > *return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* > *}});* > > *WindowedStream, > TimeWindow> enrichedComponentInstanceStream1Win = > enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* > > *DataStream enrichedComponentInstanceStream1 = > enrichedComponentInstanceStream1Win* > *.process(new Window5SecProcessing(gameId, FIVE_SECONDS, > COMPONENT_INSTANCE_OPERATION))* > *.uid("Component Instance Operation Key Monitoring " + > FIVE_SECONDS);* > *enrichedComponentInstanceStream1.addSink(new > SinkFunction() {* > *@Override* > *public void invoke(MonitoringGrouping mg, Context context) > throws Exception {* > *//TODO call ES* > *logger.debug("In enrichedComponentInstanceStream1 Sink > received mg:{}", mg);* > *}* > *});* > *Window processing class*: > private static class Window5SecProcessing extends > ProcessWindowFunction String, String>, TimeWindow> { > private transient Histogram fiveSecHist; > private transient Histogram fiveMinHist; > private transient org.apache.flink.metrics.Histogram > fiveSecHistogram; > private transient org.apache.flink.metrics.Histogram > fiveMinHistogram; > private transient ValueState total5SecCountState; > private transient ValueStateDescriptor > total5SecCountValueStateDescriptor; > > public Window5SecProcessing(String gameId, String interval, String > keyType) { > ... > } > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > logger.debug("Window5SecProcessing -Entered open - > parameters:{}", parameters);//gets here > com.codahale.metrics.Histogram fiveSecHist = > new com.codahale.metrics.Histogram(new > SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); > this.fiveSecHistogram = new > DropwizardHistogramWrapper(fiveSecHist); > total5SecCountValueStateDescriptor = > new ValueStateDescriptor("total5SecCount", > Long.class, 0L); > total5SecCountState = > getRuntimeContext().getState(total5SecCountValueStateDescriptor); > } > .. > >* public void process(Tuple3 currentKey1, > Context ctx, Iterable input, Collector out) > throws Exception {* > *logger.debug("Window5SecProcessing - Entered process > ");//never gets here* > *Tuple3 currentKey = (Tuple3 String, String>) currentKey1;* > ** > *}* > > } > At 1 point in the logs, I seem to see that there are no slots available > ? Is that the problem- how can I fix that if that is the case to test > locally on my Mac ?? > *Log:* > flink-akka.actor.default-dispatcher-71 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool > Status: > status: connected to > akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc > registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] > *available slots: []* > allocated slots: [[AllocatedSlot > AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ > 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] > pending requests: [] > sharing groups: { > 5a0ae59368145d715b3cc0d39ba6c05a > { > groupId=5a0ae59368145d715b3cc0d39ba6c05a > unresolved={} > resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost > (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}
Never gets into ProcessWindowFunction.process()
Hi, Running in IntelliJ IDE on a Mac with 4 vProcessors. Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization. Not sure if I am failing with no slots available ??? In main(): //trimmed a lot of code *FlinkKinesisConsumer kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);* *DataStream kinesisStream = env* *.addSource(kinesisConsumer)* *.uid(jobName + "KinesisSource");* *KeyedStream> enrichedComponentInstanceStream1Key = kinesisStream* *.keyBy(new KeySelector>() {* *public Tuple3 getKey(Monitoring mon) throws Exception {* *return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation());* *}});* *WindowedStream, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* *DataStream enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win* *.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))* *.uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);* *enrichedComponentInstanceStream1.addSink(new SinkFunction() {* *@Override* *public void invoke(MonitoringGrouping mg, Context context) throws Exception {* *//TODO call ES* *logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);* *}* *});* *Window processing class*: private static class Window5SecProcessing extends ProcessWindowFunction, TimeWindow> { private transient Histogram fiveSecHist; private transient Histogram fiveMinHist; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient org.apache.flink.metrics.Histogram fiveMinHistogram; private transient ValueState total5SecCountState; private transient ValueStateDescriptor total5SecCountValueStateDescriptor; public Window5SecProcessing(String gameId, String interval, String keyType) { ... } public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } .. * public void process(Tuple3 currentKey1, Context ctx, Iterable input, Collector out) throws Exception {* *logger.debug("Window5SecProcessing - Entered process ");//never gets here* *Tuple3 currentKey = (Tuple3) currentKey1;* ** *}* } At 1 point in the logs, I seem to see that there are no slots available ? Is that the problem- how can I fix that if that is the case to test locally on my Mac ?? *Log:* flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool Status: status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] *available slots: []* allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] pending requests: [] sharing groups: { 5a0ae59368145d715b3cc0d39ba6c05a { groupId=5a0ae59368145d715b3cc0d39ba6c05a unresolved={} resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe35