Hi Gary,
Bang on the money. I did not have an assigned Watermark and once I put that
in, the code entered the process() method.
Thx a ton for your help.Life-saver!!!!

DataStream<Monitoring> kinesisStream = env
        .addSource(kinesisConsumer)
        .assignTimestampsAndWatermarks(new MonitoringAssigner())//<=============



On Fri, Nov 9, 2018 at 10:02 AM Gary Yao <g...@data-artisans.com> wrote:

> Hi,
>
> You are using event time but are you assigning watermarks [1]? I do not
> see it
> in the code.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> Any help is appreciated.Dug into this. *I can see the deserialized
>> output log from FlinkKinesisConsumer deserialization but it keeps looping
>> to pull from Kinesis Stream but never gets into the Windowing operation for
>> process() or apply().*
>>
>> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
>> and the deserialized output never seems to get into the apply() or
>> process() method of a Windowing operation. I can see the logs of
>> MonitoringMapKinesisSchema deserializing data back successfully from
>> Kinesis and converting into a POJO.
>>
>> Code:
>>
>> *//Create environment*:
>> StreamExecutionEnvironment env;
>> if (local) {
>>     Configuration configuration = new Configuration();
>>     configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
>>     env = StreamExecutionEnvironment.createLocalEnvironment(1,
>> configuration);
>> } else {
>>     env = StreamExecutionEnvironment.getExecutionEnvironment();
>> }
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> *//create FlinkKinesisConsumer*
>> Properties kinesisConsumerConfig = new Properties();
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>> "10000");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>> "2000");
>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>> "TRIM_HORIZON");
>> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
>>         kinesisTopicRead, new MonitoringMapKinesisSchema(),
>> kinesisConsumerConfig);*//deserialization works fine*
>> DataStream<Monitoring> kinesisStream = env
>>                 .addSource(kinesisConsumer);
>> KeyedStream<Monitoring, Tuple3<String, String, String>>
>> enrichedComponentInstanceStream1Key = kinesisStream
>>                 .keyBy(new KeySelector<Monitoring, Tuple3<String, String,
>> String>>() {
>>                     public Tuple3<String, String, String>
>> getKey(Monitoring mon) throws Exception {
>>                         return new Tuple3<String, String,
>> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
>>                     }
>>                 });
>>
>> WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow>
>> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>>
>> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>>
>> DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
>> enrichedComponentInstanceStream1Win
>>                 //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))
>>                 .process(new Window5SecProcessing());*//never gets in
>> here*
>> //Gets into Window5SecProcessing.open() method during initialization but
>> never into the process method ????????
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
>> String, String>, TimeWindow> {
>>
>>         private transient String interval;
>>         private transient String gameId;
>>         private transient String keyType;
>>         private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>>
>>         private transient ValueState<Long> total5SecCountState;
>>         private transient ValueStateDescriptor<Long>
>> total5SecCountValueStateDescriptor;
>>         public Window5SecProcessing() {
>>
>>         }
>>
>>         public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>>             this.gameId = gameId;
>>             this.interval = interval;
>>             this.keyType = keyType;
>>         }
>>
>>         @Override
>>         public void clear(Context context) throws Exception {
>>             super.clear(context);
>>             KeyedStateStore keyedStateStore = context.windowState();
>>
>> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
>>         }
>>
>>         @Override
>>         public void open(Configuration parameters) throws Exception {
>>             super.open(parameters);
>>             logger.debug("Gets in here fine -Window5SecProcessing
>> -Entered open - parameters:{}", parameters);
>>             com.codahale.metrics.Histogram fiveSecHist =
>>                     new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>>             this.fiveSecHistogram = new
>> DropwizardHistogramWrapper(fiveSecHist);
>>             total5SecCountValueStateDescriptor =
>>                     new ValueStateDescriptor<Long>("total5SecCount",
>> Long.class, 0L);
>>             total5SecCountState =
>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>>         }
>>
>>
>>         public void process(Tuple3<String, String, String> currentKey1,
>> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
>> throws Exception {
>>             logger.debug("@@never gets here@@Window5SecProcessing -
>> Entered process ");//
>> ...
>> }
>>
>>
>>
>>
>> On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Running in IntelliJ IDE on a Mac with 4 vProcessors.
>>> Code compiles fine. It never gets into the Window5SecProcessing's
>>> process().I am able to get data from the Kinesis Consumer and it is
>>> deserialized properly when I debug the code. It gets into the
>>> Window5SecProcessing.open() method for initialization.
>>>
>>> Not sure if I am failing with no slots available ???
>>> In main():
>>> ........ //trimmed a lot of code
>>> *FlinkKinesisConsumer<Monitoring> kinesisConsumer =
>>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
>>> ...);*
>>>
>>> *DataStream<Monitoring> kinesisStream = env*
>>> *                .addSource(kinesisConsumer)*
>>> *                .uid(jobName + "KinesisSource");*
>>> *KeyedStream<Monitoring, Tuple3<String, String, String>>
>>> enrichedComponentInstanceStream1Key = kinesisStream*
>>> *                .keyBy(new KeySelector<Monitoring, Tuple3<String,
>>> String, String>>() {*
>>> *                    public Tuple3<String, String, String>
>>> getKey(Monitoring mon) throws Exception {*
>>> *                        return new Tuple3<String, String,
>>> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
>>> *                    }});*
>>>
>>> *        WindowedStream<Monitoring, Tuple3<String, String, String>,
>>> TimeWindow> enrichedComponentInstanceStream1Win =
>>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>>>
>>> *        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1
>>> = enrichedComponentInstanceStream1Win*
>>> *                .process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>>> COMPONENT_INSTANCE_OPERATION))*
>>> *                .uid("Component Instance Operation Key Monitoring " +
>>> FIVE_SECONDS);*
>>> *enrichedComponentInstanceStream1.addSink(new
>>> SinkFunction<MonitoringGrouping>() {*
>>> *            @Override*
>>> *            public void invoke(MonitoringGrouping mg, Context context)
>>> throws Exception {*
>>> *                //TODO call ES*
>>> *                logger.debug("In enrichedComponentInstanceStream1 Sink
>>> received mg:{}", mg);*
>>> *            }*
>>> *        });*
>>> *Window processing class*:
>>> private static class Window5SecProcessing extends
>>> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
>>> String, String>, TimeWindow> {
>>> private transient Histogram fiveSecHist;
>>>         private transient Histogram fiveMinHist;
>>>         private transient org.apache.flink.metrics.Histogram
>>> fiveSecHistogram;
>>>         private transient org.apache.flink.metrics.Histogram
>>> fiveMinHistogram;
>>>         private transient ValueState<Long> total5SecCountState;
>>>         private transient ValueStateDescriptor<Long>
>>> total5SecCountValueStateDescriptor;
>>>
>>>         public Window5SecProcessing(String gameId, String interval,
>>> String keyType) {
>>>             ...
>>>         }
>>>
>>>         public void open(Configuration parameters) throws Exception {
>>>             super.open(parameters);
>>>             logger.debug("Window5SecProcessing -Entered open -
>>> parameters:{}", parameters);//gets here
>>>             com.codahale.metrics.Histogram fiveSecHist =
>>>                     new com.codahale.metrics.Histogram(new
>>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>>>             this.fiveSecHistogram = new
>>> DropwizardHistogramWrapper(fiveSecHist);
>>>             total5SecCountValueStateDescriptor =
>>>                     new ValueStateDescriptor<Long>("total5SecCount",
>>> Long.class, 0L);
>>>             total5SecCountState =
>>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>>>         }
>>> ......
>>>
>>>        * public void process(Tuple3<String, String, String>
>>> currentKey1, Context ctx, Iterable<Monitoring> input,
>>> Collector<MonitoringGrouping> out) throws Exception {*
>>> *            logger.debug("Window5SecProcessing - Entered process
>>> ");//never gets here*
>>> *            Tuple3<String, String, String> currentKey = (Tuple3<String,
>>> String, String>) currentKey1;*
>>> *            ....*
>>> *        }*
>>>
>>>     }
>>> At 1 point in the logs, I seem to see that there are no slots available
>>> ????? Is that the problem- how can I fix that if that is the case to test
>>> locally on my Mac ??
>>> *Log:*
>>> flink-akka.actor.default-dispatcher-71 DEBUG
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool
>>> Status:
>>> status: connected to
>>> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
>>> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
>>> *available slots: []*
>>> allocated slots: [[AllocatedSlot
>>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
>>> pending requests: []
>>> sharing groups: {
>>> -------- 5a0ae59368145d715b3cc0d39ba6c05a --------
>>> {
>>> groupId=5a0ae59368145d715b3cc0d39ba6c05a
>>> unresolved={}
>>> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
>>> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>>> groupId=null, physicalSlot=AllocatedSlot
>>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> group=8587a27f4c92252839400ce17054b261},
>>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> group=a43726daeecb466da4d91c7b1adefb1d}]}]}
>>> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> group=8587a27f4c92252839400ce17054b261},
>>> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>>> groupId=null, physicalSlot=AllocatedSlot
>>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>>> group=8587a27f4c92252839400ce17054b261},
>>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> group=a43726daeecb466da4d91c7b1adefb1d}]},
>>> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>>> group=a43726daeecb466da4d91c7b1adefb1d}}
>>> } }
>>>
>>> TIA,
>>>
>>>

Reply via email to