Re: Running a performance benchmark load test of Flink on new CPUs

2021-11-08 Thread Vijay Balakrishnan
Austin -the flink benchmark is for testing Flink on single machines and not
a cluster.

I did see this
https://oceanrep.geomar.de/50729/1/bsc_nico_biernat_thesis.pdf but it is
more for testing the Scaling of Flink instead of testing throughput and
latency.

On Mon, Nov 8, 2021 at 10:54 AM Vijay Balakrishnan 
wrote:

> Thx, Austin. I was hoping there might be a newer benchmark run similar to
> done by dataArtisans on Flink in 2016(old).
> https://www.ververica.com/blog/extending-the-yahoo-streaming-benchmark
>
> Looks like Yahoo Streaming benchmark was an initial standard in 2016.
> Hoping to see something updated for late 2021.
>
> TIA,
> Vijay
>
> On Fri, Nov 5, 2021 at 3:16 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> I'm not too familiar with the subject, but maybe you could have a look at
>> the flink-faker[1], which generates fake data. I would think you could use
>> it to write to kafka in one Flink job, and then have another Flink job to
>> ingest and run your benchmarks.
>>
>> There is also this microbenchmark repo[2], perhaps that could be useful
>> to run on different CPUs.
>>
>> Hope those help,
>> Austin
>>
>> [1]: https://github.com/knaufk/flink-faker
>> [2]: https://github.com/apache/flink-benchmarks
>>
>> On Fri, Nov 5, 2021 at 5:14 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I am a newbie to running a performance benchmark load test of Flink on
>>> new CPUs.
>>> Is there an* existing workload generator* that I can use with Kafka and
>>> then ingest it with Flink KafkaConnector & test the performance against
>>> various new chips on servers ?
>>>
>>> Measuring CPU performance etc, vCPU usage, Latency, throughput etc.
>>> Pls pardon my ignorance in a lot of these performance related topics.
>>>
>>> TIA,
>>> Vijay
>>>
>>


Re: Running a performance benchmark load test of Flink on new CPUs

2021-11-08 Thread Vijay Balakrishnan
Thx, Austin. I was hoping there might be a newer benchmark run similar to
done by dataArtisans on Flink in 2016(old).
https://www.ververica.com/blog/extending-the-yahoo-streaming-benchmark

Looks like Yahoo Streaming benchmark was an initial standard in 2016.
Hoping to see something updated for late 2021.

TIA,
Vijay

On Fri, Nov 5, 2021 at 3:16 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Vijay,
>
> I'm not too familiar with the subject, but maybe you could have a look at
> the flink-faker[1], which generates fake data. I would think you could use
> it to write to kafka in one Flink job, and then have another Flink job to
> ingest and run your benchmarks.
>
> There is also this microbenchmark repo[2], perhaps that could be useful to
> run on different CPUs.
>
> Hope those help,
> Austin
>
> [1]: https://github.com/knaufk/flink-faker
> [2]: https://github.com/apache/flink-benchmarks
>
> On Fri, Nov 5, 2021 at 5:14 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I am a newbie to running a performance benchmark load test of Flink on
>> new CPUs.
>> Is there an* existing workload generator* that I can use with Kafka and
>> then ingest it with Flink KafkaConnector & test the performance against
>> various new chips on servers ?
>>
>> Measuring CPU performance etc, vCPU usage, Latency, throughput etc.
>> Pls pardon my ignorance in a lot of these performance related topics.
>>
>> TIA,
>> Vijay
>>
>


Running a performance benchmark load test of Flink on new CPUs

2021-11-05 Thread Vijay Balakrishnan
Hi,
I am a newbie to running a performance benchmark load test of Flink on new
CPUs.
Is there an* existing workload generator* that I can use with Kafka and
then ingest it with Flink KafkaConnector & test the performance against
various new chips on servers ?

Measuring CPU performance etc, vCPU usage, Latency, throughput etc.
Pls pardon my ignorance in a lot of these performance related topics.

TIA,
Vijay


ConnectionPool to DB and parallelism of operator question

2020-10-05 Thread Vijay Balakrishnan
HI,
Basic question on parallelism of operators and ConnectionPool to DB:
Will this result in 82 * 300 connections to InfluxDB or just 300
connections to InfluxDB ?
main() {
  sink = createInfluxMonitoringSink(..);
  keyStream.addSink(sink).addParallelism(82);//will this result in 82 * 300
connections to InfluxDB or just 300 connections to InfluxDB ?
}


private . createInfluxMonitoringSink(...) {


  final OkHttpClient.Builder okHttpClientBuilder = new
OkHttpClient.Builder()
   .readTimeout(timeout, TimeUnit.MILLISECONDS)
   .connectTimeout(timeout, TimeUnit.MILLISECONDS)
   .writeTimeout(timeout, TimeUnit.MILLISECONDS)
   .connectionPool(new ConnectionPool(300, 60,
TimeUnit.SECONDS));

   try (InfluxDB influxDB = InfluxDBFactory.connect
   (host, userName, pwd, okHttpClientBuilder)) { ..}

}

TIA,


Get only the 1st gz file from an s3 folder

2020-09-14 Thread Vijay Balakrishnan
Hi,
Able to read *.gz files from an s3 folder. I want to *get the 1st gz file*
from the s3 folder and then sort only the 1st gz file into an Ordered Map
as below and get the orderedMap.*getFirstKey()  as a 1st event timestamp*.
I want to then *pass this 1st event timestamp to all TaskManagers along
with a single current time* as an epoch Time.


final DataStreamSource stringDataStreamSource =
env.readTextFile(s3Folder);
final SingleOutputStreamOperator>>
orderedMapOutput = stringDataStreamSource.map(new MapFunction>>() {
@Override
public Map> map(String jsonStr) throws
Exception {
logger.info("record written:{}", jsonStr); //this shows the proper
json string from within the gz file properly
Map resultMap = fromJson(jsonStr);//deserialize json
//sort by event_timestamp
Map> orderedMap = new TreeMap<>();
if (resultMap != null) {
Object eventTsObj = resultMap.get(EVENT_TIMESTAMP);
if (eventTsObj != null) {
String eventTS = (String) eventTsObj;
orderedMap.put(eventTS, resultMap);
}
} else {
logger.warn("Could not deserialize:{}", jsonStr);
}
return orderedMap;
}
});

TIA,


Re: Struggling with reading the file from s3 as Source

2020-09-14 Thread Vijay Balakrishnan
My problem was the plugins jar needs to be under plugins/s3-fs-hadoop.
Running code with
Added to flink-conf.yaml:
s3.access-key:
s3.secret-key:

Removed from pom.xml all hadoop dependencies.

cd /
/bin/start-cluster.sh
./bin/flink runxyz..jar

Still struggling with how to get it work with pom.xml in IntelliJ IDEA

On Mon, Sep 14, 2020 at 12:13 PM Vijay Balakrishnan 
wrote:

> Hi Robert,
> Thanks for the link.
> Is there a simple example I can use as a starting template for using S3
> with pom.xml ?
>
> I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop
> directory
> Running from flink-1.11.1/
> flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar
> /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar
>
> Caused by: java.io.IOException: *Cannot find any jar files for plugin in
> directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. *
> Please provide the jar files for the plugin or delete the directory.
> at
> org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)
>
> *IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the
> plugins/s3-fs-hadoop directory)
> *How do I connect that to the pom.xml to run inside IntelliJ which points
> to the Apache repo??*
> pom.xml:
> Added hadoop dependencies:
> 
> 
> 
> org.apache.hadoop
> hadoop-client
> ${hadoop.version}
> 
> 
> org.apache.hadoop
> hadoop-aws
> ${hadoop.version}
> 
> 
> 
> org.apache.flink
>
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kinesis_${scala.binary.version}
> ${flink.version}
> 
>
> This gives:
> Exception in thread "main" java.lang.IllegalStateException: *No
> ExecutorFactory found to execute the application.*
> at
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>
> TIA,
>
> On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger 
> wrote:
>
>> Hi Vijay,
>>
>> Can you post the error you are referring to?
>> Did you properly set up an s3 plugin (
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/)
>> ?
>>
>> On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to *get data from S3 and process and send to Kinesis.*
>>> 1. Get gzip files from an s3 folder(s3://bucket/prefix)
>>> 2. Sort each file
>>> 3. Do some map/processing on each record in the file
>>> 4. send to Kinesis
>>>
>>> Idea is:
>>> env.readTextFile(s3Folder)
>>> .sort(SortFunction)
>>> .map(MapFunction)
>>> .sink(KinesisSink)
>>>
>>> Struggling with reading the file from s3.
>>> //Assume env is setup properly
>>> //The endpoint can either be a single file or a directory -
>>> "s3:///"
>>> final DataStreamSource stringDataStreamSource = env.
>>> readTextFile(s3Folder);
>>> stringDataStreamSource.print();
>>>
>>> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
>>> want anything to do with HDFS.
>>> Just want to read from S3.
>>> Saw a StackOverflow mention by David Anderson I think about using the
>>> Flink SQL API.
>>> I would appreciate any decent example to get the reading from S3 working.
>>>
>>> TIA,
>>> Vijay
>>>
>>>


Re: Struggling with reading the file from s3 as Source

2020-09-14 Thread Vijay Balakrishnan
Hi Robert,
Thanks for the link.
Is there a simple example I can use as a starting template for using S3
with pom.xml ?

I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop
directory
Running from flink-1.11.1/
flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar
/Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar

Caused by: java.io.IOException: *Cannot find any jar files for plugin in
directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. *
Please provide the jar files for the plugin or delete the directory.
at
org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)

*IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the
plugins/s3-fs-hadoop directory)
*How do I connect that to the pom.xml to run inside IntelliJ which points
to the Apache repo??*
pom.xml:
Added hadoop dependencies:



org.apache.hadoop
hadoop-client
${hadoop.version}


org.apache.hadoop
hadoop-aws
${hadoop.version}



org.apache.flink

flink-streaming-java_${scala.binary.version}
${flink.version}


org.apache.flink

flink-connector-kinesis_${scala.binary.version}
${flink.version}


This gives:
Exception in thread "main" java.lang.IllegalStateException: *No
ExecutorFactory found to execute the application.*
at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

TIA,

On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger  wrote:

> Hi Vijay,
>
> Can you post the error you are referring to?
> Did you properly set up an s3 plugin (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ?
>
> On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>>
>> I want to *get data from S3 and process and send to Kinesis.*
>> 1. Get gzip files from an s3 folder(s3://bucket/prefix)
>> 2. Sort each file
>> 3. Do some map/processing on each record in the file
>> 4. send to Kinesis
>>
>> Idea is:
>> env.readTextFile(s3Folder)
>> .sort(SortFunction)
>> .map(MapFunction)
>> .sink(KinesisSink)
>>
>> Struggling with reading the file from s3.
>> //Assume env is setup properly
>> //The endpoint can either be a single file or a directory -
>> "s3:///"
>> final DataStreamSource stringDataStreamSource = env.
>> readTextFile(s3Folder);
>> stringDataStreamSource.print();
>>
>> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
>> want anything to do with HDFS.
>> Just want to read from S3.
>> Saw a StackOverflow mention by David Anderson I think about using the
>> Flink SQL API.
>> I would appreciate any decent example to get the reading from S3 working.
>>
>> TIA,
>> Vijay
>>
>>


Struggling with reading the file from s3 as Source

2020-09-11 Thread Vijay Balakrishnan
Hi,

I want to *get data from S3 and process and send to Kinesis.*
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory -
"s3:///"
final DataStreamSource stringDataStreamSource = env.
readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink
SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-30 Thread Vijay Balakrishnan
Hi David,
Thx for your reply.

To summarize:
Use a Counter:

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance
  .counter("myCounter");

This MyMetricsValue will show up in Prometheus as for eg:
0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter
and so on for 1.

Window(TumblingWindow...).. for each parallel Operator.

This will then have to be aggregated in Prometheus for 5 secs for all
the .

Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter
  // no task executors here - this is at Operator level ???

This is independent of task Executors right ?? How does your statement
- Flink does not support aggregating operator-level metrics across
task executors. This job is left to proper time-series databases.
relate to the Summary above from me

Also, I am assuming that the Counter will get reset after every Window
interval of 5 secs or do I need to do counter.dec(counter.getCount())
in the close() method as you showed above.

TIA,




On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler  wrote:

> I'd recommend to do the aggregation over 5 seconds in graphite/prometheus
> etc., and expose a counter in Flink for each attribute/event_name.
>
> User variables are a good choice for encoding the attribute/event_name
> values.
>
> As for your remaining questions:
>
> Flink does not support aggregating operator-level metrics across task
> executors. This job is left to proper time-series databases.
>
> A counter can be reset like this: counter.dec(counter.getCount())
> You can also create a custom implementation with whatever behavior you
> desire.
>
> The default meter implementation (MeterView) calculate the rate of events
> per second based on counts that are periodically gathered over some
> time-period (usually 1 minute). If you want to calculate the
> rate-per-second over the last 5 seconds, then new Meterview(5) should do
> the trick.
> If you want to have a rate-per-5-seconds, then you will need to implement
> a custom meter. Note that I would generally discourage this as it will not
> work properly with some metric systems which assume rates to be per-second.
>
> On 27/07/2020 19:59, Vijay Balakrishnan wrote:
>
> Hi Al,
> I am looking at the Custom User Metrics to count incoming records by an
> incomng attribute, event_name and aggregate it over 5 secs.
> I looked at
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
> .
> I am trying to figure out which one to use Counter or Meter.
> If using Counter, how do I reset it after 5 secs.
> If using Meter which measures avg throughput, How do i specify a
> duration like 5 secs ? markEvent(long n) ???
>
> I am also trying to collect total count of events across all TaskManagers.
> Do I collect at
> flink_taskmanager_job_task__numrecordsIn  or
> flink_taskmanager_job_task_operator__numrecordsIn  ??
> (so at task or operator level
>
> Or should I use User variables like below:
>
> counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each 
> custom event_name here- I might not know all custom event_names in advance
>   .counter("myCounter");
>
>
> Pardon my confusion here.
> TIA,
>
> On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
> wrote:
>
>> Hi David,
>> Thanks for your reply.
>> I am already using the PrometheusReporter. I am trying to figure out how
>> to dig into the application data and count grouped by an attribute called
>> event_name in the incoming application data and report to Grafana via
>> Prometheus.
>>
>> I see the following at a high level
>> task_numRecordsIn
>> task_numRecordsOut
>> ..operator_numLateRecordsDropped
>>
>> Trying to dig in deeper than this numRecordsIn to get groped by
>> event_name attribute coming in the Input record every 5 secs.
>> TIA,
>>
>> On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
>> wrote:
>>
>>> Setting up a Flink metrics dashboard in Grafana requires setting up and
>>> configuring one of Flink's metrics reporters [1] that is supported by
>>> Grafana as a data source. That means your options for a metrics reporter
>>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>>>
>>> If you want reporting every 5 seconds, with the push based reporters
>>> that's something you would configure in flink-conf.yaml, whereas with
>>> Prometheus you'll need to configure the scrape interval in the p

Count of records in the Stream for a time window of 5s

2020-07-30 Thread Vijay Balakrishnan
Hi,

Trying to get a count of records in the Stream for a time window of 5s.
Always getting a count of 1 ??
Sent in 10 records.Expect the count to be 10 at the end.

Tried to follow the advise here from Fabian Hueske-
https://stackoverflow.com/questions/45606999/how-to-count-the-number-of-records-processed-by-apache-flink-in-a-given-time-win

DataStream> kinesisStream;
...//get data from Kinesis source into kinesisStream - works fine
final SingleOutputStreamOperator> filterDroppedEvents =
kinesisStream
.filter(resultMap -> {
long timestamp = Utils.getEventTimestampFromMap(resultMap);
long currTimestamp = System.currentTimeMillis();
long driftFromCurrTS = currTimestamp - timestamp;
if (driftFromCurrTS < 0) {
Object eventNameObj = resultMap.get(EVENT_NAME);
String eventName = eventNameObj != null ? (String) eventNameObj
: "";
logger.debug("PMS - event_timestamp is > current timestamp by
driftFromCurrTS:{} for event_name:{} and event_timestamp:{}",
driftFromCurrTS, eventName, timestamp);
return true;
} else {
return false;
}
});//*called 10 times here - GOOD*

final SingleOutputStreamOperator droppedEventsMapToCountRows =
filterDroppedEvents
.map(mapValue -> new CountRows(mapValue, 1L,
mapValue.get(EVENT_NAME) != null ? (String) mapValue.get(EVENT_NAME) :
""));//this is* called 10 times - GOOD*

final KeyedStream countRowsKeyedStream =
droppedEventsMapToCountRows.keyBy(new KeySelector() {
@Override
public String getKey(CountRows countRows) throws Exception {
logger.info("Inside getKey");
return countRows.getEventName();
}
});//*doesn't get in here to this logger statement ??*

final AllWindowedStream
countRowsTimeWindowAllWindowedStream =  countRowsKeyedStream
.*timeWindowAll*
(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
//.sum("count")
final SingleOutputStreamOperator countRowsReduceStream =
countRowsTimeWindowAllWindowedStream.reduce((accum, input) -> {
logger.info("Inside reduce");
return new CountRows(input.getRow(), accum.getCount() +
input.getCount(), input.getEventName());// sum 1s to count
});//*don't see this logger statement "Inside reduce"*

DataStream droppedEventsStream =
countRowsReduceStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(CountRows countRows, Collector out)
throws Exception {
logger.info("Inside final map"); // *only called once and
countRows.getCount() is 1 - BAD - want it to be 10 ??*
Map mapValue = countRows.getRow();
//long currTimestamp = System.currentTimeMillis();
Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
String eventTimestamp = eventTSObj != null ? (String)eventTSObj :
"";
long eventTS = Utils.getLongFromDateStr(eventTimestamp);
Map tags = new HashMap<>();
Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
String eventName = eventNameObj != null ? (String)eventNameObj : "";
tags.put(Utils.EVENT_NAME, eventName);
Map fields = new HashMap<>();
fields.put("count", *countRows.getCount()*);
out.collect(new InfluxDBPoint("dropped_events_count", eventTS,
tags, fields));//TODO: measurement name
}
});
/* *Tried map but doesn't work*
reduceStream.map(countRows -> {
logger.info("Inside final map");
Map mapValue = countRows.getRow();
//long currTimestamp = System.currentTimeMillis();
Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
String eventTimestamp = eventTSObj != null ? (String)eventTSObj
: "";
long eventTS = Utils.getLongFromDateStr(eventTimestamp);
Map tags = new HashMap<>();
Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
String eventName = eventNameObj != null ? (String)eventNameObj
: "";
tags.put(Utils.EVENT_NAME, eventName);
Map fields = new HashMap<>();
fields.put("count", countRows.getCount());
return new InfluxDBPoint("dropped_events_count", eventTS, tags,
fields);//TODO: measurement name
});*/
droppedEventsStream.addSink(influxSink);

@
CountRows is a *POJO wrapper around the Map to add the
count*:
public static class *CountRows* implements Serializable,
Comparable {
Map row;
Long *count*;
String eventName;
.



TIA,


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an
incomng attribute, event_name and aggregate it over 5 secs.
I looked at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a
duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task__numrecordsIn
or
flink_taskmanager_job_task_operator__numrecordsIn  ?? (so
at task or operator level

Or should I use User variables like below:

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance
  .counter("myCounter");


Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
wrote:

> Hi David,
> Thanks for your reply.
> I am already using the PrometheusReporter. I am trying to figure out how
> to dig into the application data and count grouped by an attribute called
> event_name in the incoming application data and report to Grafana via
> Prometheus.
>
> I see the following at a high level
> task_numRecordsIn
> task_numRecordsOut
> ..operator_numLateRecordsDropped
>
> Trying to dig in deeper than this numRecordsIn to get groped by event_name
> attribute coming in the Input record every 5 secs.
> TIA,
>
> On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
> wrote:
>
>> Setting up a Flink metrics dashboard in Grafana requires setting up and
>> configuring one of Flink's metrics reporters [1] that is supported by
>> Grafana as a data source. That means your options for a metrics reporter
>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>>
>> If you want reporting every 5 seconds, with the push based reporters
>> that's something you would configure in flink-conf.yaml, whereas with
>> Prometheus you'll need to configure the scrape interval in the prometheus
>> config file. For more on using Flink with Prometheus, see the blog post by
>> Maximilian Bode [2].
>>
>> Best,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>> [2]
>> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>>
>> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I am trying to figure out how many records came into the Flink App from
>>> KDS and how many records got moved to the next step or was dropped by the
>>> watermarks.
>>>
>>> I see on the Ui Table for *Source. Records Sent* with a total and the
>>> next step *Filter->FlatMap operator with a Records Received *total. How
>>> can I get these metric values for me to display In Grafana for eg. as I
>>> want to know a count for each 5 secs, how many records came in and how many
>>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>>
>>> I looked at the breakdown of the Source__Custom_Source in Metrics as
>>> show in the attached pic. It has values like 0.NumRecordsIn and
>>> 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified.
>>> It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn
>>> and 0.Timestamps/Watermarks.numRecordsOut
>>>
>>> Attached are some screenshots of the Flink DashBoard UI.
>>>
>>> TIA,
>>>
>>>


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to
dig into the application data and count grouped by an attribute called
event_name in the incoming application data and report to Grafana via
Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name
attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
wrote:

> Setting up a Flink metrics dashboard in Grafana requires setting up and
> configuring one of Flink's metrics reporters [1] that is supported by
> Grafana as a data source. That means your options for a metrics reporter
> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>
> If you want reporting every 5 seconds, with the push based reporters
> that's something you would configure in flink-conf.yaml, whereas with
> Prometheus you'll need to configure the scrape interval in the prometheus
> config file. For more on using Flink with Prometheus, see the blog post by
> Maximilian Bode [2].
>
> Best,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
> [2]
> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>
> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I am trying to figure out how many records came into the Flink App from
>> KDS and how many records got moved to the next step or was dropped by the
>> watermarks.
>>
>> I see on the Ui Table for *Source. Records Sent* with a total and the
>> next step *Filter->FlatMap operator with a Records Received *total. How
>> can I get these metric values for me to display In Grafana for eg. as I
>> want to know a count for each 5 secs, how many records came in and how many
>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>
>> I looked at the breakdown of the Source__Custom_Source in Metrics as show
>> in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut
>> and so on from 0 to 9 for the parallelism 10 I specified. It also has
>> various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
>> 0.Timestamps/Watermarks.numRecordsOut
>>
>> Attached are some screenshots of the Flink DashBoard UI.
>>
>> TIA,
>>
>>


Re: FlinkKinesisProducer blocking ?

2020-07-23 Thread Vijay Balakrishnan
Thanks Gordon,
So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a
Queue(unbounded by default). This then goes through KPL MaxConnections(24
by default) to KDS.

This suggests,  I need to decrease sub-tasks or setQueueLimit(800) and
increase MaxConnections=256 (max allowed).
Checkpointing is not currently enabled.

Pls correct me if I am wrong.

On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Vijay,
>
> ThreadPoolSize is for per Kinesis producer, which there is one for each
> parallel subtask.
> If you are constantly hitting the 1MB per second per shard quota, then the
> records will be buffered by the FlinkKinesisProducer.
> During this process, backpressure is not applied if you have not
> configured an upper bound for the buffer queue.
>
> One other thing to note, which might explain the backpresses at regular
> intervals that you are experiencing,
> is that the FlinkKinesisProducer needs to flush all pending records in the
> buffer before the checkpoint can complete for the sink.
> That would also apply backpressure upstream.
>
> Gordon
>
> On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan 
> wrote:
>
>> Hi Gordon,
>> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
>> 32 nodes.
>> Could it be that the 80 threads get bottlenecked on a common ThreadPool
>> of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers
>> run in separate slots/vCPUs and can be spread across 32 nodes in my case
>> but occupying 80 slots/vCPUs. Is my understanding correct and will this be
>> the reason that the KPL gets flooded with too many pending requests at
>> regular intervals ??
>>
>> TIA,
>>
>> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Thanks,Gordon for your reply.
>>>
>>> I do not set a queueLimit and so the default unbounded queueSize is 
>>> 2147483647.
>>> So, it should just be dropping records being produced from the
>>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>>> do not want backpressure as you said it effectively blocks all upstream
>>> operators.
>>>
>>> But from what you are saying, it will apply backpressure when the number
>>> of outstanding records accumulated exceeds the default queue limit of 
>>> 2147483647
>>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>>> probable.
>>>
>>> So, calculating Queue Limit:
>>> Based on this, my records size = 1600 bytes. I have 96 shards
>>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>>
>>> Acc. to the docs:
>>>
>>> By default, FlinkKinesisProducer does not backpressure. Instead,
>>> records that cannot be sent because of the rate restriction of 1 MB per
>>> second per shard are buffered in an unbounded queue and dropped when their
>>> RecordTtl expires.
>>>
>>> To avoid data loss, you can enable backpressuring by restricting the
>>> size of the internal queue:
>>>
>>> // 200 Bytes per record, 1 shard
>>> kinesis.setQueueLimit(500);
>>>
>>>
>>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>>> It does however apply backpressure (therefore effectively blocking all
>>>> upstream operators) when the number of outstanding records accumulated
>>>> exceeds a set limit, configured using the
>>>> FlinkKinesisProducer#setQueueLimit
>>>> method.
>>>>
>>>> For starters, you can maybe check if that was set appropriately.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>


Re: MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-22 Thread Vijay Balakrishnan
Hi Gordon,
Thx for your reply.
FlinkKinesisProducer default is ThreadPool which is what I am using. So,
does that mean only 10 threads are making calls to KDS by default ??
I see from the number of records coming to the KDS that I need only 1-2
shards. So, the bottleneck is on the KPL side.
Does this mean I have to set a QueueLimit of 500 as shown in the example
below ??
>From what you said, Total MaxConnections would then be by default: 24 *
number of subtasks = 24 * 80 = 1920 connections to KDS.
KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ?
So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ??

I am trying to reconcile the diff above ? Somewhere I am flooding KPL with
too many requests & it gives the curl 28 error.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
 expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Vijay,
>
> I'm not entirely sure of the semantics between ThreadPoolSize and
> MaxConnections since they are all KPL configurations (this specific
> question would probably be better directed to AWS),
> but my guess would be that the number of concurrent requests to the KPL
> backend is capped by MaxConnections. This is per parallel
> FlinkKinesisProducer subtask.
>
> As for ThreadPoolSize, do note that the default threading model by KPL is
> PER_REQUEST, for which the KPL native process will launch a thread for each
> request.
> Under heavy load, this would of course be an issue. Since you didn't
> explicitly mention this config, make sure to set this to POOLED to actually
> make use of a fixed thread pool for requests.
>
> Overall, my suggestion is to set a reasonable queue limit for the number
> of records buffered by KPL's native process (by default it is unbounded).
> Without that in place, under high load you would easily be resource
> exhausted, and can cause more unpredictable checkpointing times since the
> FlinkKinesisProducer would need to flush pending records on checkpoints
> (which ultimately also applies backpressure upstream).
>
> BR,
> Gordon
>
> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
>> stream(KDS).
>> Getting following errors:
>> 1.
>> Throttling
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>
>>  
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>>
>> 2. ERROR
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
>> [shard_map.cc:150] Shard map update for stream "_write" failed. Code: 
>> *LimitExceededException
>> Message: Rate exceeded for stream *..._write under account
>> 753274046439.; retrying in 1500 ms
>>
>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>>
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>>
>>
>> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>>
>>
>> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>>
>> These are the KPL property changes I am planning to make.
>>
>> *RequestTimeput*: 1 //default 6000 ms
>>
>> *AggregationEnabled*: true //default is true
>>
>> *ThreadPoolSize*: *15* //default 10
>>
>> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
>> when we flooded KPL with requests. Requests are sent in parallel over
>> multiple connections to the backend.
>

MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Vijay Balakrishnan
Hi,
Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
stream(KDS).
Getting following errors:
1.
Throttling
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)

2. ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
[shard_map.cc:150] Shard map update for stream "_write" failed.
Code: *LimitExceededException
Message: Rate exceeded for stream *..._write under account 753274046439.;
retrying in 1500 ms

3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties

https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/

These are the KPL property changes I am planning to make.

*RequestTimeput*: 1 //default 6000 ms

*AggregationEnabled*: true //default is true

*ThreadPoolSize*: *15* //default 10

*MaxConnections*: *48* //default 24 - this might have been a bottleneck
when we flooded KPL with requests. Requests are sent in parallel over
multiple connections to the backend.

*RecordTtl*: *1* //default 3 ms  - drop record after 10s.

*FailIfThrottled*: *true* //default false - so if throttled, don't retry.


We were using parallelism for sinks at 80. So each corresponds to 1
FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
MaxConnections is 24 from KPL.

I am not sure about the MaxConnections setting - what does 48 mean here -is
it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS backend
via KPL ?

Any thoughts on how not to overwhelm KPL while handling real time streaming
load to the Kinesis via the FlinkKinesisProducer ?

TIA,


Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32
nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of
10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
in separate slots/vCPUs and can be spread across 32 nodes in my case but
occupying 80 slots/vCPUs. Is my understanding correct and will this be the
reason that the KPL gets flooded with too many pending requests at regular
intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan 
wrote:

> Thanks,Gordon for your reply.
>
> I do not set a queueLimit and so the default unbounded queueSize is 
> 2147483647.
> So, it should just be dropping records being produced from the
> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
> do not want backpressure as you said it effectively blocks all upstream
> operators.
>
> But from what you are saying, it will apply backpressure when the number
> of outstanding records accumulated exceeds the default queue limit of 
> 2147483647
> or* does it also do it if it is r**ate-limited* *to 1MB per second per
> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
> probable.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
> expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Vijay,
>>
>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>> It does however apply backpressure (therefore effectively blocking all
>> upstream operators) when the number of outstanding records accumulated
>> exceeds a set limit, configured using the
>> FlinkKinesisProducer#setQueueLimit
>> method.
>>
>> For starters, you can maybe check if that was set appropriately.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
So, it should just be dropping records being produced from the
80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
do not want backpressure as you said it effectively blocks all upstream
operators.

But from what you are saying, it will apply backpressure when the number of
outstanding records accumulated exceeds the default queue limit of 2147483647
or* does it also do it if it is r**ate-limited* *to 1MB per second per
shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Vijay,
>
> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
> It does however apply backpressure (therefore effectively blocking all
> upstream operators) when the number of outstanding records accumulated
> exceeds a set limit, configured using the
> FlinkKinesisProducer#setQueueLimit
> method.
>
> For starters, you can maybe check if that was set appropriately.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-09 Thread Vijay Balakrishnan
Hi,
I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS &
SHARD_DISCOVERY_INTERVAL_MILLIS.

My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often
records are fetched from Kinesis Data Stream(KDS). Code seems to be doing
this in ShardConsumer.run()-->getRecords()

SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer checks
if there are any changes to shards. We don't change shards during our
Application run.I have changed it to a very high value to avoid this check
as I was running into ListShards issues with LimitExceedeException when
using 282 shards
Would this be a correct understanding of these 2 constants -especially the
SHARD_DISCOVERY_INTERVAL_MILLIS

My assumption that needs to be validated:
The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of
records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.

Code below:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
getRecsIntervalMs);//2000

/*
We do not change shards while the app is running.
So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value to
avoid any rateLimiting issues from the AWS API with the ListShards call.
Default is 10s. We can increase this to avoid this LimitExceededException
as we don't change shards in the middle.
 */

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
shardDiscoveryInterval);//1800 ms


TIA,


FlinkKinesisProducer blocking ?

2020-07-07 Thread Vijay Balakrishnan
Hi,
current setup.

Kinesis stream 1 -> Kinesis Analytics Flink -> Kinesis stream 2
|
> Firehose Delivery stream

Curl eror:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-07-02 15:22:32.203053] [0x07f4][0x7ffbced15700] [error]
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

But I am still seeing tons of the curl 28 error. I use parallelism of 80
for the Sink to Kinesis Data stream(KDS). Which seems to point to KDS being
pounded with too many requests - the 80(parallelism) * 10(ThreadPool size)
= 800 requests. Is my understanding correct ? So, maybe reduce the 80
parallelism ??
*I still don't understand why the logs are stuck with just
FlinkKInesisProducer for around 4s(blocking calls???) *with the rest of the
Flink Analytics application not producing any logs while this happens.
*I noticed that the FlinkKInesisProducer took about 3.785secs, 3.984s,
4.223s in between other application logs in Kibana when the Kinesis
GetIterator Age peaked*. It seemed like FlinkKinesisProducer was blocking
for that long when the Flink app was not able to generate any other logs.

Looked at this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

Could use this:
producerConfig.put("RequestTimeout", "1");//from 6000

But doesn't really solve the problem when trying to maintain a real time
processing system.

TIA


Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Hi Xintong,
Just to be clear. I haven't set any -Xmx -i will check our scripts again.
Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB
will be used.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am
assuming the heap max of 102Gb will be used in the N/w mem calculation.
Is that the right way to set env.java.opts ??
TIA,
Vijay

On Fri, Jun 12, 2020 at 1:49 AM Xintong Song  wrote:

> Flink should have calculated the heap size and set the -Xms, according to
> the equations I mentioned. So if you haven't set an customized -Xmx that
> overwrites this, it should not use the default 1.4 of physical memory.
>>
>>
>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>>0.48) = 53 GB
>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) 
>> =
>>40.6GB
>>
>>
> Are you running Flink on Mesos? I think Flink has not automatically set
> -Xmx on Mesos.
>
>
> BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is
> much closer to 29GB if we consider there are some rounding errors and
> accuracy loss.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan 
> wrote:
>
>> Thx, Xintong for a great answer. Much appreciated.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap
>>
>>
>> Max heap: if -Xmx is set then it is its value else ¼ of physical machine
>> memory estimated by the JVM
>>
>> No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
>> figure.
>>
>> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The memory configurations in Flink 1.9 and previous versions are indeed
>>> complicated and confusing. That is why we made significant changes to it in
>>> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
>>> upcoming Flink 1.11 which is very likely to be released in this month.
>>>
>>> Regarding your questions,
>>>
>>>- "Physical Memory" displayed on the web ui stands for the total
>>>memory on your machine. This information is retrieved from your OS. It is
>>>not related to the network memory calculation. It is displayed mainly for
>>>historical reasons.
>>>- The error message means that you have about 26.8 GB network memory
>>>(877118 * 32768 bytes), and your job is trying to use more.
>>>- The "total memory" referred in network memory calculation is:
>>>   - jvm-heap + network, if managed memory is configured on-heap
>>>   (default)
>>>  - According to your screenshot, the managed memory
>>>  on-heap/off-heap configuration is not touched, so this should be 
>>> your case.
>>>   - jvm-heap + managed + network, if managed memory is configured
>>>   off-heap
>>>- The network memory size is actually derived reversely. Flink reads
>>>the max heap size from JVM (and the managed memory size from 
>>> configuration
>>>if it is configured off-heap), and derives the network memory size with 
>>> the
>>>following equation.
>>>   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
>>>   (1-networkFraction) * networkFraction))
>>>   - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48)
>>>   * 0.48)) = 26.8GB
>>>
>>> One thing I don't understand is, why do you only have 29GB heap size
>>> when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB).
>>> The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use
>>> "total" to represent "taskmanager.heap.size" for short. Also omitted the
>>> calculations when managed memory is configured off-heap.
>>>
>>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1
>>>- 0.48) = 53 GB
>>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 
>>> 0.48) =
>>>40.6GB
>>>
>>> Have you specified a custom "-Xmx" parameter?
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Jun 12, 2020 a

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Thx, Xintong for a great answer. Much appreciated.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap


Max heap: if -Xmx is set then it is its value else ¼ of physical machine
memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song  wrote:

> Hi Vijay,
>
> The memory configurations in Flink 1.9 and previous versions are indeed
> complicated and confusing. That is why we made significant changes to it in
> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
> upcoming Flink 1.11 which is very likely to be released in this month.
>
> Regarding your questions,
>
>- "Physical Memory" displayed on the web ui stands for the total
>memory on your machine. This information is retrieved from your OS. It is
>not related to the network memory calculation. It is displayed mainly for
>historical reasons.
>- The error message means that you have about 26.8 GB network memory
>(877118 * 32768 bytes), and your job is trying to use more.
>- The "total memory" referred in network memory calculation is:
>   - jvm-heap + network, if managed memory is configured on-heap
>   (default)
>  - According to your screenshot, the managed memory
>  on-heap/off-heap configuration is not touched, so this should be 
> your case.
>   - jvm-heap + managed + network, if managed memory is configured
>   off-heap
>- The network memory size is actually derived reversely. Flink reads
>the max heap size from JVM (and the managed memory size from configuration
>if it is configured off-heap), and derives the network memory size with the
>following equation.
>   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
>   (1-networkFraction) * networkFraction))
>   - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) *
>   0.48)) = 26.8GB
>
> One thing I don't understand is, why do you only have 29GB heap size when
> "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The
> JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total"
> to represent "taskmanager.heap.size" for short. Also omitted the
> calculations when managed memory is configured off-heap.
>
>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>0.48) = 53 GB
>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) *
>    (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) =
>40.6GB
>
> Have you specified a custom "-Xmx" parameter?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Get this error:
>> java.io.IOException: Insufficient number of network buffers: required 2,
>> but only 0 available. The total number of network buffers is currently set
>> to 877118 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message
>> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
>> typical reason for `AskTimeoutException` is that the recipient actor didn't
>> send a reply.
>>
>>
>> Followed docs here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> network = Min(max, Max(min, fraction x total)  //what does Total mean -
>> The max JVM heap is used to derive the total memory for the calculation of
>> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
>> = Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
>> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
>> Used this in flink-conf.yaml:
>> taskmanager.numberOfTaskSlots: 10
>> rest.server.max-content-length: 314572800
>> taskmanager.network.memory.fraction: 0.45
>> taskmanager.network.memory.max: 50gb
>> taskmanager.network.memory.min: 500mb
>> akka.ask.timeout: 240s
>> cluster.evenly-spread-out-slots: true
>> akka.tcp.timeout: 240s
>> taskmanager.network.request-backoff.initial: 5000
>> taskmanager.network.request-backoff.max: 3
>> web.timeout:100
>> web.refresh-interval:6000
>>
>> Saw some old calc about buffers
>> (slots/Tm * slots/TM) * #TMs * 4
>> =10 * 10 * 47 * 4 = 18,800 buffers.
>>
>> What am I missing in the network buffer calc ??
>>
>> TIA,
>>
>>
>>


Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-05 Thread Vijay Balakrishnan
Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout
properties.

kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());

private static final class SerializableCustomPartitioner extends
KinesisPartitioner> {

private static final long serialVersionUID = -5196071893997035695L;

@Override
public String getPartitionId(Map map) {
StringBuilder stringBuilder = new StringBuilder();
UUID uuid = UUID.randomUUID();
stringBuilder.append(uuid);
return stringBuilder.toString();
}
}


On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan 
wrote:

> Hi,
> Looks like I am sending a Map to Kinesis and it is being
> sent to 1 partition only. *How can I make this distribute across multiple
> partitions/shards on the Kinesis Data stream with this Map*
> data ?
>
> *Sending to Kinesis*:
> DataStream> influxToMapKinesisStream =
> enrichedMGStream.map(influxDBPoint -> {
> return new
> MonitoringGroupingToInfluxDBPoint(agg,
> groupBySetArr).fromInfluxDBPoint(influxDBPoint);
> }).returns(new TypeHint>()
> {
> }).setParallelism(dfltParallelism);
>
> FlinkKinesisProducer>
> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite,
> region, local, localKinesis);
>
> influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);
>
> *Map used to send to Kinesis:*
>
> Map mapObj = new HashMap<>();
> mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
> mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
> mapObj.put(Utils.TAGS, influxDBPoint.getTags());
> mapObj.put(Utils.FIELDS, influxDBPoint.getFields());
>
> TIA,
>
> On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
>> have "AggregationEnabled" set to false ?
>>
>> flink_connector_kinesis_2.11 : flink version 1.9.1
>>
>> //Setup Kinesis Producer
>> Properties kinesisProducerConfig = new Properties();
>> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>> region);
>>
>> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> //kinesisProducerConfig.setProperty("AggregationEnabled",
>> "false");
>>
>> FlinkKinesisProducer> kinesisProducer = new
>> FlinkKinesisProducer<>(
>> new MonitoringMapKinesisSchema(localKinesis),
>> kinesisProducerConfig);
>>
>> //TODO: kinesisProducer.setFailOnError(true);
>> kinesisProducer.setDefaultStream(kinesisTopicWrite);
>> kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
>> return kinesisProducer;
>>
>> TIA,
>>
>


Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-04 Thread Vijay Balakrishnan
Hi,
Looks like I am sending a Map to Kinesis and it is being
sent to 1 partition only. *How can I make this distribute across multiple
partitions/shards on the Kinesis Data stream with this Map*
data ?

*Sending to Kinesis*:
DataStream> influxToMapKinesisStream =
enrichedMGStream.map(influxDBPoint -> {
return new
MonitoringGroupingToInfluxDBPoint(agg,
groupBySetArr).fromInfluxDBPoint(influxDBPoint);
}).returns(new TypeHint>() {
}).setParallelism(dfltParallelism);

FlinkKinesisProducer>
kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite,
region, local, localKinesis);

influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);

*Map used to send to Kinesis:*

Map mapObj = new HashMap<>();
mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
mapObj.put(Utils.TAGS, influxDBPoint.getTags());
mapObj.put(Utils.FIELDS, influxDBPoint.getFields());

TIA,

On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan 
wrote:

> Hi,
> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
> have "AggregationEnabled" set to false ?
>
> flink_connector_kinesis_2.11 : flink version 1.9.1
>
> //Setup Kinesis Producer
> Properties kinesisProducerConfig = new Properties();
> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
> region);
>
> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
> //kinesisProducerConfig.setProperty("AggregationEnabled", "false");
>
> FlinkKinesisProducer> kinesisProducer = new
> FlinkKinesisProducer<>(
> new MonitoringMapKinesisSchema(localKinesis),
> kinesisProducerConfig);
>
> //TODO: kinesisProducer.setFailOnError(true);
> kinesisProducer.setDefaultStream(kinesisTopicWrite);
> kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
> return kinesisProducer;
>
> TIA,
>


FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-04 Thread Vijay Balakrishnan
Hi,
My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
have "AggregationEnabled" set to false ?

flink_connector_kinesis_2.11 : flink version 1.9.1

//Setup Kinesis Producer
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
region);

kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
//kinesisProducerConfig.setProperty("AggregationEnabled", "false");

FlinkKinesisProducer> kinesisProducer = new
FlinkKinesisProducer<>(
new MonitoringMapKinesisSchema(localKinesis),
kinesisProducerConfig);

//TODO: kinesisProducer.setFailOnError(true);
kinesisProducer.setDefaultStream(kinesisTopicWrite);
kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
return kinesisProducer;

TIA,


Re: Flink Dashboard UI Tasks hard limit

2020-05-29 Thread Vijay Balakrishnan
; When you say "it still works", I thought that you increased the
> parallelism the job was sill executed as the parallelism was not increased.
> From your latest reply, it seems the job's parallelism is indeed
> increased, but then it runs into failures.
>
> The reason you run into the "Insufficient number of network buffers"
> exception, is that with more tasks in your job, more inter-task data
> transmission channels, thus memory for network buffers, are needed.
>
> To increase the network memory size, the following configuration options,
> as you already found, are related.
>
>- taskmanager.network.memory.fraction
>- taskmanager.network.memory.max
>- taskmanager.network.memory.min
>
> Please be aware that `taskmanager.memory.task.off-heap.size` is not
> related to network memory, and is only available in Flink 1.10 and above
> while you're using 1.9.1 as suggested by the screenshots.
>
> The network memory size is calculated as `min(max(some_total_value *
> network_fraction, network_min), network_max)`. According to the error
> message, your current network memory size is `85922 buffers * 32KB/buffer
> = 2685MB`, smaller than your "max" (4gb). That means increasing the "max"
> does not help in your case. It is the "fraction" that you need to increase.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan 
> wrote:
>
>> Hi Xintong,
>> Looks like the issue is not fully resolved :( Attaching 2 screenshots of
>> the memory consumption of 1 of the TaskManagers.
>>
>> To increase the used up Direct memory off heap,Do I change this:
>>  taskmanager.memory.task.off-heap.size: 5gb
>>
>> I had increased the taskmanager.network.memory.max: 24gb
>> which seems excessive.
>>
>> 1 of the errors I saw in the Flink logs:
>>
>> java.io.IOException: Insufficient number of network buffers: required 1,
>> but only 0 available. The total number of network buffers is currently set
>> to 85922 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>> at
>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>
>> TIA,
>>
>>
>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>> Flink logs to see the errors.
>>> I had to change taskmanager.network.memory.max: 4gb
>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>
>>> taskmanager.network.memory.fraction: 0.15
>>> taskmanager.network.memory.max: 4gb
>>> taskmanager.network.memory.min: 500mb
>>> akka.ask.timeout: 240s
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song 
>>> wrote:
>>>
>>>> Could you also explain how do you set the parallelism when getting this
>>>> execution plan?
>>>> I'm asking because this json file itself only shows the resulted
>>>> execution plan. It is not clear to me what is not working as expected in
>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>> execution plan only shows 5.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan 
>>>> wrote:
>>>>
>>>>> Hi Xintong,
>>>>> Thanks for the excellent clarification for tasks.
>>>>>
>>>>> I attached a sample screenshot above and din't reflect the slots used
>>>>> and the tasks limit I was running into in that pic.
>>>>>
>>>>> I am attaching my Execution plan here. Please let me know how I can
>>>>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>>>>> i run into this bottleneck with the tasks.
>>>>>
>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start to
>>>>> see this.
>>>>> TIA,
>>>>>
>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song 
>>>>> wrote:
>&

Re: Flink Dashboard UI Tasks hard limit

2020-05-27 Thread Vijay Balakrishnan
Thanks so much, Xintong for guiding me through this. I looked at the Flink
logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout:
240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song  wrote:

> Could you also explain how do you set the parallelism when getting this
> execution plan?
> I'm asking because this json file itself only shows the resulted execution
> plan. It is not clear to me what is not working as expected in your case.
> E.g., you set the parallelism for an operator to 10 but the execution plan
> only shows 5.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan 
> wrote:
>
>> Hi Xintong,
>> Thanks for the excellent clarification for tasks.
>>
>> I attached a sample screenshot above and din't reflect the slots used and
>> the tasks limit I was running into in that pic.
>>
>> I am attaching my Execution plan here. Please let me know how I can
>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>> i run into this bottleneck with the tasks.
>>
>> BTW - The https://flink.apache.org/visualizer/ is a great start to see
>> this.
>> TIA,
>>
>> On Sun, May 24, 2020 at 7:52 PM Xintong Song 
>> wrote:
>>
>>> Increasing network memory buffers (fraction, min, max) seems to increase
>>>> tasks slightly.
>>>
>>> That's wired. I don't think the number of network memory buffers have
>>> anything to do with the task amount.
>>>
>>> Let me try to clarify a few things.
>>>
>>> Please be aware that, how many tasks a Flink job has, and how many slots
>>> a Flink cluster has, are two different things.
>>> - The number of tasks are decided by your job's parallelism and
>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>> tasks.
>>> - The number of slots are decided by number of TMs and slots-per-TM.
>>> - For streaming jobs, you have to make sure the number of slots is
>>> enough for executing all your tasks. The number of slots needed for
>>> executing your job is by default the max parallelism of your job graph
>>> vertices. Take the above example, you would need 4 slots, because it's the
>>> max among all the vertices' parallelisms (2, 3, 4).
>>>
>>> In your case, the screenshot shows that you job has 9621 tasks in total
>>> (not around 18000, the dark box shows total tasks while the green box shows
>>> running tasks), and 600 slots are in use (658 - 58) suggesting that the max
>>> parallelism of your job graph vertices is 600.
>>>
>>> If you want to increase the number of tasks, you should increase your
>>> job parallelism. There are several ways to do that.
>>>
>>>- In your job codes (assuming you are using DataStream API)
>>>   - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>   parallelism for all operators.
>>>   - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>   parallelism for a specific operator. (Only supported for subclasses of
>>>   `SingleOutputStreamOperator`.)
>>>- When submitting your job, use `-p ` as an argument
>>>for the `flink run` command, to set parallelism for all operators.
>>>- Set `parallelism.default` in your `flink-conf.yaml`, to set a
>>>default parallelism for your jobs. This will be used for jobs that have 
>>> not
>>>set parallelism with neither of the above methods.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan 
>>> wrote:
>>>
>>>> Hi Xintong,
>>>> Thx for your reply.  Increasing network memory buffers (fraction, min,
>>>> max) seems to increase tasks slightly.
>>>>
>>>> Streaming job
>>>> Standalone
>>>>
>>>> Vijay
>>>>
>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song 
>>>> wrote:
>>>>
>>>>> Hi Vijay,
>>>>>
>>>>> I don't think your problem is related to number of opening files. The
>>>>> parallelis

Re: Flink Dashboard UI Tasks hard limit

2020-05-22 Thread Vijay Balakrishnan
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max)
seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song  wrote:

> Hi Vijay,
>
> I don't think your problem is related to number of opening files. The
> parallelism of your job is decided before actually tries to open the files.
> And if the OS limit for opening files is reached, you should see a job
> execution failure, instead of a success execution with a lower parallelism.
>
> Could you share some more information about your use case?
>
>- What kind of job are your executing? Is it a streaming or batch
>processing job?
>- Which Flink deployment do you use? Standalone? Yarn?
>- It would be helpful if you can share the Flink logs.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I have increased the number of slots available but the Job is not using
>> all the slots but runs into this approximate 18000 Tasks limit. Looking
>> into the source code, it seems to be opening file -
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>> So, do I have to tune the ulimit or something similar at the Ubuntu O/S
>> level to increase number of tasks available ? What I am confused about is
>> the ulimit is per machine but the ExecutionGraph is across many machines ?
>> Please pardon my ignorance here. Does number of tasks equate to number of
>> open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
>> 16 vCPUs.
>>
>> TIA.
>>
>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>>
>>> Flink Dashboard UI seems to show tasks having a hard limit for Tasks
>>> column around 18000 on a Ubuntu Linux box.
>>> I kept increasing the number of slots per task manager to 15 and number
>>> of slots increased to 705 but the slots to tasks
>>> stayed at around 18000. Below 18000 tasks, the Flink Job is able to
>>> start up.
>>> Even though I increased the number of slots, it still works when 312
>>> slots are being used.
>>>
>>> taskmanager.numberOfTaskSlots: 15
>>>
>>> What knob can I tune to increase the number of Tasks ?
>>>
>>> Pls find attached the Flink Dashboard UI.
>>>
>>> TIA,
>>>
>>>


Re: Flink Dashboard UI Tasks hard limit

2020-05-20 Thread Vijay Balakrishnan
Hi,
I have increased the number of slots available but the Job is not using all
the slots but runs into this approximate 18000 Tasks limit. Looking into
the source code, it seems to be opening file -
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S
level to increase number of tasks available ? What I am confused about is
the ulimit is per machine but the ExecutionGraph is across many machines ?
Please pardon my ignorance here. Does number of tasks equate to number of
open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
wrote:

> Hi,
>
> Flink Dashboard UI seems to show tasks having a hard limit for Tasks
> column around 18000 on a Ubuntu Linux box.
> I kept increasing the number of slots per task manager to 15 and number of
> slots increased to 705 but the slots to tasks
> stayed at around 18000. Below 18000 tasks, the Flink Job is able to start
> up.
> Even though I increased the number of slots, it still works when 312 slots
> are being used.
>
> taskmanager.numberOfTaskSlots: 15
>
> What knob can I tune to increase the number of Tasks ?
>
> Pls find attached the Flink Dashboard UI.
>
> TIA,
>
>


Pre-process data before it hits the Source

2019-11-25 Thread Vijay Balakrishnan
Hi,
Need to pre-process data(transform incoming data to a different format)
before it hits the Source I have defined. How can I do that ?

I tried to use a .map on the DataStream but that is too late as the data
has already hit the Source I defined.
FlinkKinesisConsumer> kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
socketTimeout);
DataStreamSource> monitoringDataStreamSource =
env.addSource(kinesisConsumer);

DataStream> kinesisStream1 = kinesisStream.map(new
TransformFunction(...));//too late here

TIA,


currentWatermark for Event Time is not increasing fast enough to go past the window.maxTimestamp

2019-10-17 Thread Vijay Balakrishnan
Hi,
*Event Time Window: 15s*
My currentWatermark for Event Time processing is not increasing fast enough
to go past the window maxTimestamp.
I have reduced *bound* used for watermark calculation to just *10 ms*.
I have increased the parallelInput to process input from Kinesis in
parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer)
.setParallelism(2);
For FlinkKinesisConsumer, I added a property from flink-1.8.0,
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.
*SHARD_IDLE_INTERVAL_MILLIS*, 25);//this didn't seem to help

//in *EventTimeTrigger*.java: if (window.maxTimestamp() <=
ctx.getCurrentWatermark()) Trigger.FIRE;
My event producer to Kinesis is producing at a delay of 2500 ms for each
record.(business requirement).
What else can I do to consume data from Kinesis faster and cross the
threshold for
currentWatermark to increase beyond the window.maxTimestamp faster ?

*MonitoringTSWAssigner* code:
public class MonitoringTSWAssigner implements
AssignerWithPunctuatedWatermarks> {
private long bound = 5 * (long) 1000;//5 secs out of order bound in
millisecs
private long maxTimestamp = Long.MIN_VALUE;

public MonitoringTSWAssigner() {
}

public MonitoringTSWAssigner(long bound) {
this.bound = bound;
}

public long extractTimestamp(Map monitoring, long
previousTS) {
long extractedTS = getExtractedTS(monitoring);
if (extractedTS > maxTimestamp) {
maxTimestamp = extractedTS;
}
   return extractedTS;
//return System.currentTimeMillis();
}

public long getExtractedTS(Map monitoring) {
final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP)
!= null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
return Utils.getLongFromDateStr(eventTimestamp);
}

@Override
public Watermark checkAndGetNextWatermark(Map
monitoring, long extractedTimestamp) {
long extractedTS = getExtractedTS(monitoring);
long nextWatermark = extractedTimestamp - *bound*;
return new Watermark(nextWatermark);
}
}

TIA


Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Vijay Balakrishnan
Hi Theo,
You were right. For some reason(I still haven't figured it out) but the
FilterFunction was causing issues. I commented it out and it started
getting into the add() method of the aggregate method.

/*kinesisStream = kinesisStream.filter((FilterFunction>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});*/
//String metric = Objects.requireNonNull(inputMetricSelector).getMetric();

TIA,

Vijay




On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan 
wrote:

> Hi Theo,
> It gets to the FilterFunction during the creation of the ExecutionGraph
> initially but not during the runtime when recs are streaming in.So, it is
> not getting that far- seems to be stuck in the
>
> final SingleOutputStreamOperator> filteredKinesisStream = 
> kinesisStream.filter   code.
>
> Doesn't seem to get past it as it keeps incrementing watermarks but the 
> Watermark never seems to hit the end of the window.Maybe I am doing
>
> something super simple stupid.
>
> TIA,
> Vijay
>
> On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Vijay,
>>
>> Maybe a stupid question, but according to your comments, the code works
>> fine up till a "flatMap" operation. It seems that this flatMap is directly
>> followed by a filter-Function in the method
>> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
>> all events? Or is not even the filter function itself called? (Due to your
>> comments suggesting it).
>>
>> Best regards
>> Theo
>>
>> --
>> *Von: *"Vijay Balakrishnan" 
>> *An: *"Dawid Wysakowicz" 
>> *CC: *"user" 
>> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
>> *Betreff: *Re: add() method of AggregateFunction not called even though
>> new watermark is emitted
>>
>> Hi,
>> Thx for the replies - Congxian & Dawdi.
>> Watermarks are advancing.Not sure how to check every new generated
>> watermark is reaching end of the window 
>>
>> I did check the Flink UI for the currentInputWatermark and it is
>> increasing monotonically.
>>
>> Narrowed down the problem to not calling the windowStream.aggregate.
>> I also *added a checkpoint *to see if it was causing the issue.Didn't
>> seem to help.
>> Most of the code is reached during the creation of the ExecutionGraph on
>> the start of the program.
>>
>> I generate an incrementing sequence of timestamps(delay of 5000ms between
>> each rec) from a Producer to Kinesis and it emits a new watermark as it
>> starts receiving the input records.
>> My window size is 15s.
>> I see a WindowedStream is created with windowAssigner:
>> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
>> but the *code never gets into the EventTimeTrigger.onElement() or
>> onEventTime() to fire the trigger*.
>> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
>> I even tried to use ProcessingTime but that also didn't help.
>>
>>
>> //code to create kinesis consumer successfully..
>> for (Rule rule : rules.getRules()) {
>> //gets in here fine
>> final SingleOutputStreamOperator>
>> filteredKinesisStream = kinesisStream.filter(mon -> {
>> boolean result;
>> String eventName = mon.get(MEASUREMENT) != null ?
>> (String) mon.get(MEASUREMENT) : "";
>> InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>> String measurement = inputMetricSelector != null ?
>> inputMetricSelector.getMeasurement() : "";
>> result = eventName.equals(measurement);
>> if (result) {
>> Map inputTags = mon.get(TAGS) != null
>> ? (Map) mon.get(TAGS) : new HashMap<>();
>> Map ruleTags = inputMetricSelector !=
>> null ? inputMetricSelector.getTags() : new HashMap<>();
>> result = matchTags(inputTags, ruleTags);
>> }
>> return result;//*<== this is true*
>> }
>> ).flatMap((FlatMapFunction, Map>)
>> (input, out) -> {
>> out.collect(input);//*< runs up till here fine*
>> }).returns(new TypeHint>() {
>> });
>> //*doesn't do anything beyond this point at runtime*
>> DataStream enrichedMGStream =
>> pms.createAggregatedMonitoringGroupingWindowStream1
>> (filteredKinesisStr

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Vijay Balakrishnan
Hi Theo,
It gets to the FilterFunction during the creation of the ExecutionGraph
initially but not during the runtime when recs are streaming in.So, it is
not getting that far- seems to be stuck in the

final SingleOutputStreamOperator>
filteredKinesisStream = kinesisStream.filter   code.

Doesn't seem to get past it as it keeps incrementing watermarks but
the Watermark never seems to hit the end of the window.Maybe I am
doing

something super simple stupid.

TIA,
Vijay

On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Vijay,
>
> Maybe a stupid question, but according to your comments, the code works
> fine up till a "flatMap" operation. It seems that this flatMap is directly
> followed by a filter-Function in the method
> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
> all events? Or is not even the filter function itself called? (Due to your
> comments suggesting it).
>
> Best regards
> Theo
>
> --
> *Von: *"Vijay Balakrishnan" 
> *An: *"Dawid Wysakowicz" 
> *CC: *"user" 
> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
> *Betreff: *Re: add() method of AggregateFunction not called even though
> new watermark is emitted
>
> Hi,
> Thx for the replies - Congxian & Dawdi.
> Watermarks are advancing.Not sure how to check every new generated
> watermark is reaching end of the window 
>
> I did check the Flink UI for the currentInputWatermark and it is
> increasing monotonically.
>
> Narrowed down the problem to not calling the windowStream.aggregate.
> I also *added a checkpoint *to see if it was causing the issue.Didn't
> seem to help.
> Most of the code is reached during the creation of the ExecutionGraph on
> the start of the program.
>
> I generate an incrementing sequence of timestamps(delay of 5000ms between
> each rec) from a Producer to Kinesis and it emits a new watermark as it
> starts receiving the input records.
> My window size is 15s.
> I see a WindowedStream is created with windowAssigner:
> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
> but the *code never gets into the EventTimeTrigger.onElement() or
> onEventTime() to fire the trigger*.
> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
> I even tried to use ProcessingTime but that also didn't help.
>
>
> //code to create kinesis consumer successfully..
> for (Rule rule : rules.getRules()) {
> //gets in here fine
> final SingleOutputStreamOperator>
> filteredKinesisStream = kinesisStream.filter(mon -> {
> boolean result;
> String eventName = mon.get(MEASUREMENT) != null ? (String)
> mon.get(MEASUREMENT) : "";
> InputMetricSelector inputMetricSelector =
> rule.getInputMetricSelector();
> String measurement = inputMetricSelector != null ?
> inputMetricSelector.getMeasurement() : "";
> result = eventName.equals(measurement);
> if (result) {
> Map inputTags = mon.get(TAGS) != null
> ? (Map) mon.get(TAGS) : new HashMap<>();
> Map ruleTags = inputMetricSelector !=
> null ? inputMetricSelector.getTags() : new HashMap<>();
> result = matchTags(inputTags, ruleTags);
> }
> return result;//*<== this is true*
> }
> ).flatMap((FlatMapFunction, Map>)
> (input, out) -> {
> out.collect(input);//*< runs up till here fine*
> }).returns(new TypeHint>() {
> });
> //*doesn't do anything beyond this point at runtime*
> DataStream enrichedMGStream =
> pms.createAggregatedMonitoringGroupingWindowStream1
> (filteredKinesisStream, ruleFactory, rule, parallelProcess);
> enrichedMGStream.addSink(influxSink)
> .setParallelism(nbrSinks);
> }
>
> private DataStream
> createAggregatedMonitoringGroupingWindowStream1(DataStream Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
> parallelProcess) {
> DataStream enrichedComponentInstanceStream1;
> RuleConfig ruleConfig = rule.getRuleConfig();
> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
> RuleIF ruleImpl = ruleFactory.getRule(ruleType);
> Map ruleProps = ruleConfig != null ?
> ruleConfig.getRuleProps() : new HashMap<>();
> Object intervalObj = ruleProps.get("rule_eval_window");
> String timeInterval = intervalObj != null ? (String) intervalObj : "";
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> 

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-14 Thread Vijay Balakrishnan
nt parallelProcess) {
long slide = 100;
final WindowedStream, MonitoringTuple, TimeWindow>
windowStream =
windowType.equalsIgnoreCase(SLIDING) ?
monitoringTupleKeyedStream
.timeWindow(timeWindow,
org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
monitoringTupleKeyedStream
.timeWindow(timeWindow);
return windowStream.aggregate(
new MGroupingWindowAggregate(interval),//*<=== never gets into
add() here*
new MGroupingAggregateWindowProcessing(interval, ruleImpl,
rule))
.map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz 
wrote:

> Hi Vijay,
>
> Could you check if the Watermark for the aggregate operator advances? You
> should be able to check that in the Flink WebUI. Could it be that the
> Watermark does not advance for all of the upstream operators? The watermark
> for a particular operator is a minimum of watermarks received from all of
> the upstream operators. Therefore if some of them does not produce any, the
> resulting watermark will not advance.
>
> Best,
>
> Dawdi
> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>
> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark is
> fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
> back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 3). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I
> *don't see the add() method of MGroupingWindowAggregate called * I *can
> see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
> FlinkKinesisConsumer> kinesisConsumer = new
> FlinkKinesisConsumer<>(
> "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
> kinesisConsumerConfig);
>
> DataStream> kinesisStream;
> RichSinkFunction influxSink;
>
> DataStreamSource> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
> .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
> ));
> influxSink = pms.createInfluxMonitoringSink();
> ..
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m
>
> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream enrichedMGStream = 
> *windowStream.aggregate*(//*<=
> never reaches here ?*
> *new MGroupingWindowAggregate(interval)*,
> new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
> .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks> {
> private long bound = 5 * (long) 1000;//5 secs out of order bound in
> millisecs
> private long maxTimestamp = Long.MIN_VALUE;
>
> public MonitoringTSWAssigner() {
> }
>
> public MonitoringTSWAssigner(long bound) {
> this.bound = bound;
> }
>
> public long extractTimestamp(Map monitoring, long
> previousTS) {
> long extractedTS = getExtractedTS(monitoring);
> if (extractedTS > maxTimestamp) {
> maxTimestamp = extractedTS;
> }
>
>return extractedTS;//return System.currentTimeMillis();
>
> }
>
> public long getExtractedTS(Map monitoring) {
> final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
> return Utils.getLongFromDateStr(eventTimestamp);
> }
>

add() method of AggregateFunction not called even though new watermark is emitted

2019-10-11 Thread Vijay Balakrishnan
Hi,
Here is my issue with *Event Processing* with the *add() method of
MGroupingWindowAggregate not being called* even though a new watermark is
fired
1. *Ingest data from Kinesis (works fine)*
2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
back)
3. I do *assign MonitoringTSWAssigner*(code below) to the source with bound
of 10(have tried 3000, 3). *It fires a new WaterMark* with each
incoming record but the *windowStream.aggregate method doesn't seem to fire*
and I
*don't see the add() method of MGroupingWindowAggregatecalled * I *can
see the newWaterMark being emitted in
TimestampsAndPunctuatedWatermarksOperator.processElement*
4. I have tried with timeWindow of 1m and 15s

*Main* code:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer> kinesisConsumer = new
FlinkKinesisConsumer<>(
"kinesisTopicRead", new MonitoringMapKinesisSchema(true),
kinesisConsumerConfig);

DataStream> kinesisStream;
RichSinkFunction influxSink;

DataStreamSource> monitoringDataStreamSource =
env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
.assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound));
influxSink = pms.createInfluxMonitoringSink();
..
...timeWindow = Time.seconds(*timeIntervalL*);//tried with
timeIntervalL=15s, 1m

KeyedStream, MonitoringTuple>
monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream, MonitoringTuple, TimeWindow>
windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream enrichedMGStream = *windowStream.aggregate*(//*<=
never reaches here ?*
*new MGroupingWindowAggregate(interval)*,
new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
.map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

*MonitoringTSWAssigner* code:
public class MonitoringTSWAssigner implements
AssignerWithPunctuatedWatermarks> {
private long bound = 5 * (long) 1000;//5 secs out of order bound in
millisecs
private long maxTimestamp = Long.MIN_VALUE;

public MonitoringTSWAssigner() {
}

public MonitoringTSWAssigner(long bound) {
this.bound = bound;
}

public long extractTimestamp(Map monitoring, long
previousTS) {
long extractedTS = getExtractedTS(monitoring);
if (extractedTS > maxTimestamp) {
maxTimestamp = extractedTS;
}

   return extractedTS;//return System.currentTimeMillis();

}

public long getExtractedTS(Map monitoring) {
final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP)
!= null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
return Utils.getLongFromDateStr(eventTimestamp);
}

@Override
public Watermark checkAndGetNextWatermark(Map
monitoring, long extractedTimestamp) {
long extractedTS = getExtractedTS(monitoring);
long nextWatermark = maxTimestamp - bound;
return new Watermark(nextWatermark);
}
}

*MGroupingWindowAggregate*:
public class MGroupingWindowAggregate implements
*AggregateFunction*, Map, Map> {
private final String interval;
public MGroupingWindowAggregate(String interval) {
this.interval = interval;
}
public Map createAccumulator() {
return new ConcurrentHashMap<>();
}

public Map add(Map monitoring,
Map timedMap) {
.
}

.

}

TIA,


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-08-01 Thread Vijay Balakrishnan
Hi Rafi,
I tried your approach with:

> windowStream.trigger(ContinuousEventTimeTrigger.of(Time.minutes(5)));
>
> I can use .trigger with ProcessWindowFunction but it doesn't accumulate
data across windows i.e I want to collect data for a 5h window with data
sent to output every 5 mins with the output data getting accumulated after
every 5 mins.

@Felipe- I am using a ProcessWindowFunction and cannot find a way to use
process() & onTimer with it.

On Sun, Jun 30, 2019 at 11:45 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> No, there is no specific reason.
> I am using it because I am computing the HyperLogLog over a window.
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Jul 1, 2019 at 12:34 AM Vijay Balakrishnan 
> wrote:
>
>> Hi Felipe,
>> Thanks for the example. I will try a variation of that for mine. Is there
>> a specific reason to use the HyperLogLogState ?
>>
>> Vijay
>>
>> On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> I managed by using
>>> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
>>> processElement method and clearing the state on the onTimer method. This is
>>> my program [1].
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>>>
>>> Kind Regards,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch  wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> When using windows, you may use the 'trigger' to set a Custom Trigger
>>>> which would trigger your *ProcessWindowFunction* accordingly.
>>>>
>>>> In your case, you would probably use:
>>>>
>>>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>>>
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>>
>>>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan 
>>>> wrote:
>>>>
>>>>> I am also implementing the ProcessWindowFunction and accessing the
>>>>> windowState to get data but how do i push data out every 5 mins during a 4
>>>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>>>
>>>>> public  class MGroupingAggregateClass extends
>>>>>> ProcessWindowFunction<> {
>>>>>>
>>>>>> private MapState timedGroupKeyState;
>>>>>> private MapState globalGroupKeyState;
>>>>>> private final MapStateDescriptor
>>>>>> timedMapKeyStateDescriptor =
>>>>>>new MapStateDescriptor<>("timedGroupKeyState",
>>>>>>String.class, Object.class);
>>>>>> private final MapStateDescriptor
>>>>>> globalMapKeyStateDescriptor =
>>>>>>new MapStateDescriptor<>("globalGroupKeyState",
>>>>>>String.class, Object.class);
>>>>>>
>>>>>>
>>>>>> public void open(Configuration ..) {
>>>>>> timedGroupKeyState =
>>>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>>>> globalGroupKeyState =
>>>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>>>> }
>>>>>>
>>>>>> public void process(MonitoringTuple currKey, Context context,
>>>>>> Iterable> elements,
>>>>>>Collector> out) throws
>>>>>> Exception {
>>>>>>logger.info("Entered MGroupingAggregateWindowProcessing -
>>>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>>>timedGroupKeyState =
>>>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>>>

Re: Converting Metrics from a Reporter to a Custom Events mapping

2019-08-01 Thread Vijay Balakrishnan
Thanks for all your replies.Ended up using a StatsdReporter with Flink and
building a statsd plugin to transform the data to my required output format
and dump it into a folder that the Kinesis agent can then pick up.

On Tue, Jul 16, 2019 at 2:16 AM Chesnay Schepler  wrote:

> You can configure multiple reporters, so just configured both the reporter
> that the app users want and your own which does the transformation and
> sending to kinesis.
>
> On 16/07/2019 09:37, Haibo Sun wrote:
>
> Hi,  Vijay
>
> Or can you implement a Reporter that transforms the metrics and sends them
> directly to a Kinesis Stream?
>
> Best,
> Haibo
>
> At 2019-07-16 00:01:36, "Vijay Balakrishnan" 
>  wrote:
>
> Hi,
> I need to capture the Metrics sent from a Flink app to a Reporter and
> transform them to an Events API format I have designed. I have been looking
> at the Reporters(
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables)
> and have used them but what would be a best practice to capture this
> metrics data to transform it ?
>
> The folks using the Flink app still want to see their metrics in the Flink
> Dashboard using their chosen(not sure yet what they chose-assuming
> ConsoleReporter) Reporter. I need to capture those metrics, transform them
> to my Events API format and send it to a Kinesis Stream.
>
> We use Prometheus and InfluxDB in our environments for other purposes.
>
> Should I use the SLF4J Reporter to dump the metrics into a log file/folder
> and watch that with a Kinesis Agent and transform it somehow(?) and then
> send it to the Kinesis data stream ?
>
> TIA,
>
>
>


Converting Metrics from a Reporter to a Custom Events mapping

2019-07-15 Thread Vijay Balakrishnan
Hi,
I need to capture the Metrics sent from a Flink app to a Reporter and
transform them to an Events API format I have designed. I have been looking
at the Reporters(
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables)
and have used them but what would be a best practice to capture this
metrics data to transform it ?

The folks using the Flink app still want to see their metrics in the Flink
Dashboard using their chosen(not sure yet what they chose-assuming
ConsoleReporter) Reporter. I need to capture those metrics, transform them
to my Events API format and send it to a Kinesis Stream.

We use Prometheus and InfluxDB in our environments for other purposes.

Should I use the SLF4J Reporter to dump the metrics into a log file/folder
and watch that with a Kinesis Agent and transform it somehow(?) and then
send it to the Kinesis data stream ?

TIA,


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-30 Thread Vijay Balakrishnan
Hi Felipe,
Thanks for the example. I will try a variation of that for mine. Is there a
specific reason to use the HyperLogLogState ?

Vijay

On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Vijay,
>
> I managed by using
> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
> processElement method and clearing the state on the onTimer method. This is
> my program [1].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch  wrote:
>
>> Hi Vijay,
>>
>> When using windows, you may use the 'trigger' to set a Custom Trigger
>> which would trigger your *ProcessWindowFunction* accordingly.
>>
>> In your case, you would probably use:
>>
>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>
>>
>> Thanks,
>> Rafi
>>
>>
>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan 
>> wrote:
>>
>>> I am also implementing the ProcessWindowFunction and accessing the
>>> windowState to get data but how do i push data out every 5 mins during a 4
>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>
>>> public  class MGroupingAggregateClass extends
>>>> ProcessWindowFunction<> {
>>>>
>>>> private MapState timedGroupKeyState;
>>>> private MapState globalGroupKeyState;
>>>> private final MapStateDescriptor
>>>> timedMapKeyStateDescriptor =
>>>>new MapStateDescriptor<>("timedGroupKeyState",
>>>>String.class, Object.class);
>>>> private final MapStateDescriptor
>>>> globalMapKeyStateDescriptor =
>>>>new MapStateDescriptor<>("globalGroupKeyState",
>>>>String.class, Object.class);
>>>>
>>>>
>>>> public void open(Configuration ..) {
>>>> timedGroupKeyState =
>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>> globalGroupKeyState =
>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>> }
>>>>
>>>> public void process(MonitoringTuple currKey, Context context,
>>>> Iterable> elements,
>>>>Collector> out) throws
>>>> Exception {
>>>>logger.info("Entered MGroupingAggregateWindowProcessing -
>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>timedGroupKeyState =
>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>globalGroupKeyState =
>>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>>> ...
>>>> //get data fromm state
>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>>
>>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>>> window ??
>>>>
>>>> }
>>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan 
>>> wrote:
>>>
>>>> Hi,
>>>> Need to calculate a 4 hour time window for count, sum with current
>>>> calculated results being output every 5 mins.
>>>> How do i do that ?
>>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>>> the KeyedStream.
>>>>
>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow
>>>>> = Time.seconds(timeIntervalL);
>>>>> KeyedStream, ...> monitoringTupleKeyedStream =
>>>>> kinesisStream.keyBy(...);
>>>>> final WindowedStream, , TimeWindow>
>>>>> windowStream =
>>>>> monitoringTupleKeyedStream
>>>>> .timeWindow(timeWindow);
>>>>> DataStream<> enrichedMGStream = windowStream.aggregate(
>>>>> new MGroupingWindowAggregateClass(...),
>>>>> new MGroupingAggregateClass())
>>>>> .map(new Monitoring...(...));
>>>>> enrichedMGStream.addSink(..);
>>>>>
>>>>
>>>>
>>>> TIA,
>>>> Vijay
>>>>
>>>


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-17 Thread Vijay Balakrishnan
I am also implementing the ProcessWindowFunction and accessing the
windowState to get data but how do i push data out every 5 mins during a 4
hr time window ?? I am adding a globalState to handle the 4 hr window ???
Or should I still use the context.windowState even for the 4 hr window ?

public  class MGroupingAggregateClass extends ProcessWindowFunction<> {
>
> private MapState timedGroupKeyState;
> private MapState globalGroupKeyState;
> private final MapStateDescriptor
> timedMapKeyStateDescriptor =
>new MapStateDescriptor<>("timedGroupKeyState",
>String.class, Object.class);
> private final MapStateDescriptor
> globalMapKeyStateDescriptor =
>new MapStateDescriptor<>("globalGroupKeyState",
>String.class, Object.class);
>
>
> public void open(Configuration ..) {
> timedGroupKeyState =
> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
> globalGroupKeyState =
> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
> }
>
> public void process(MonitoringTuple currKey, Context context,
> Iterable> elements,
>Collector> out) throws
> Exception {
>logger.info("Entered MGroupingAggregateWindowProcessing - process
> interval:{}, currKey:{}", interval, currKey);
>timedGroupKeyState =
> context.windowState().getMapState(timedMapKeyStateDescriptor);
>globalGroupKeyState =
> context.globalState().getMapState(globalMapKeyStateDescriptor);
> ...
> //get data fromm state
> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>
> //how do i push the data out every 5 mins to the sink during the 4 hr
> window ??
>
> }
>







On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan 
wrote:

> Hi,
> Need to calculate a 4 hour time window for count, sum with current
> calculated results being output every 5 mins.
> How do i do that ?
> Currently, I calculate results for 5 sec and 5 min time windows fine on
> the KeyedStream.
>
> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>> Time.seconds(timeIntervalL);
>> KeyedStream, ...> monitoringTupleKeyedStream =
>> kinesisStream.keyBy(...);
>> final WindowedStream, , TimeWindow> windowStream =
>> monitoringTupleKeyedStream
>> .timeWindow(timeWindow);
>> DataStream<> enrichedMGStream = windowStream.aggregate(
>> new MGroupingWindowAggregateClass(...),
>> new MGroupingAggregateClass())
>> .map(new Monitoring...(...));
>> enrichedMGStream.addSink(..);
>>
>
>
> TIA,
> Vijay
>


Re: NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

2019-06-17 Thread Vijay Balakrishnan
Thanks,Fabian.
I got around the issue by moving the logic for the
DropwizardHistogramWrapper -a non serializable class into the
ProcessWindowFunction's open() function.



On Fri, Jun 7, 2019 at 12:33 AM Fabian Hueske  wrote:

> Hi,
>
> There are two ways:
>
> 1. make the non-serializable member variable transient (meaning that it
> won't be serialized) and check in the aggregate call if it has been
> initialized or not.
> 2. implement your own serialization logic by overriding readObject() and
> writeObject() [1].
>
> Best, Fabian
>
> [1]
> https://howtodoinjava.com/java/serialization/custom-serialization-readobject-writeobject/
>
> Am Do., 6. Juni 2019 um 23:04 Uhr schrieb Vijay Balakrishnan <
> bvija...@gmail.com>:
>
>> HI,
>> I have a class defined :
>>
>> public class MGroupingWindowAggregate implements AggregateFunction.. {
>>> private final Map keyHistMap = new TreeMap<>();
>>> }
>>>
>> In the constructor, I initialize it.
>>
>>> public MGroupingWindowAggregate() {
>>> Histogram minHist = new Histogram(new
>>> SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
>>> org.apache.flink.metrics.Histogram minHistogram = new
>>> DropwizardHistogramWrapper(minHist);
>>> Map intervalHistMap = new
>>> TreeMap<>();
>>> intervalHistMap.putIfAbsent(interval, minHistogram);
>>> keyHistMap.putIfAbsent(operationKey, intervalHistMap);
>>> }
>>>
>> When trying to use it in the add() method of AggregateFunction, it fails
>> saying:
>> NotSerializableException:
>> org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
>>
>> Tried to wrap DropwizardHistogramWrapper inside a serializable Object
>> with Composition but that also didn't work.
>>
>> Looked at using RichFunction open() based on Stephan's advise here.
>> https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>> But cannot use RichFunction with AggrgeateFunction or use
>> RichAggregateFunction
>>
>> How can I use the DropwizardHistogramWrapper -a non serializable class
>> inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to
>> get some Histogram percentile stats without re-inventing the wheel.
>>
>> TIA,
>> Vijay
>>
>


Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-17 Thread Vijay Balakrishnan
Hi,
Need to calculate a 4 hour time window for count, sum with current
calculated results being output every 5 mins.
How do i do that ?
Currently, I calculate results for 5 sec and 5 min time windows fine on the
KeyedStream.

Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
> Time.seconds(timeIntervalL);
> KeyedStream, ...> monitoringTupleKeyedStream =
> kinesisStream.keyBy(...);
> final WindowedStream, , TimeWindow> windowStream =
> monitoringTupleKeyedStream
> .timeWindow(timeWindow);
> DataStream<> enrichedMGStream = windowStream.aggregate(
> new MGroupingWindowAggregateClass(...),
> new MGroupingAggregateClass())
> .map(new Monitoring...(...));
> enrichedMGStream.addSink(..);
>


TIA,
Vijay


NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

2019-06-06 Thread Vijay Balakrishnan
HI,
I have a class defined :

public class MGroupingWindowAggregate implements AggregateFunction.. {
> private final Map keyHistMap = new TreeMap<>();
> }
>
In the constructor, I initialize it.

> public MGroupingWindowAggregate() {
> Histogram minHist = new Histogram(new
> SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
> org.apache.flink.metrics.Histogram minHistogram = new
> DropwizardHistogramWrapper(minHist);
> Map intervalHistMap = new
> TreeMap<>();
> intervalHistMap.putIfAbsent(interval, minHistogram);
> keyHistMap.putIfAbsent(operationKey, intervalHistMap);
> }
>
When trying to use it in the add() method of AggregateFunction, it fails
saying:
NotSerializableException:
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

Tried to wrap DropwizardHistogramWrapper inside a serializable Object with
Composition but that also didn't work.

Looked at using RichFunction open() based on Stephan's advise here.
https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
But cannot use RichFunction with AggrgeateFunction or use
RichAggregateFunction

How can I use the DropwizardHistogramWrapper -a non serializable class
inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to
get some Histogram percentile stats without re-inventing the wheel.

TIA,
Vijay


FlinkKinesisConsumer not getting data from Kinesis at a constant speed -lag of about 30-55 secs

2019-05-17 Thread Vijay Balakrishnan
Hi,
In using FlinkKinesisConsumer, I am seeing a lag of about 30-55 secs in
fetching data from Kinesis after it has done 1 or 2 fetches even though
data is getting put in the Kinesis data stream at a high clip.
I used ConsumerConfigConstants.SHARD_GETRECORDS_MAX of 1 (tried with
5000, 200 etc) and ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
of 200ms(default is great here becaise of the 5 transaction limit per sec
from AWS).Have also tried reducing the interval but I run into
readThroughput Exception. How can I reduce this lag to make it pretty much
real-time. I am also using Flink Processing time. Have gone from 1-3 shards
for Kinesis Data stream. Is there some other tuning parm I need to add for
FlinkKinesisConsumer or is it just that it doesn't have any data to pull
from Kinesis.
I do 5 sec Tumbling time windows and use the window end timestamp to put
into my InfluxDB timestamp column. I see that there is a constant 35 sec-
55 sec lag in the timestamps and that corresponds to the time lag I see in
the logs where FlinkKinesisConsumer is waiting to fetch data from Kinesis.
I am seeing these log statements and not sure what to make of it to reduce
the time lag of fetching data from Kinesis.
Logs:

23:23:40,286 [shardConsumers-Source: Custom Source -> (Map -> Sink:
Unnamed, Filter) (2/8)-thread-0] DEBUG
org.apache.flink.kinesis.shaded.com.amazonaws.requestId  [] -
x-amzn-RequestId: f06409aa-d996-fb3f-a53c-5c066d509c9b
23:23:40,335 [Source: Custom Source -> (Map -> Sink: Unnamed, Filter)
(2/8)] DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 1 is trying to discover new shards that were created due to
resharding ...


TIA,


Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-05-09 Thread Vijay Balakrishnan
 I solved the problem by following another person's recommendation on the
other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
> private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector Object>, MonitoringTuple> {

private final Set groupBySet;


> public MapTupleKeySelector(Set groupBySet) {

this.groupBySet = groupBySet;

}


> @Override

public MonitoringTuple getKey(Map inputMap) {

int groupBySetSize = groupBySet.size();

Tuple tuple = Tuple.newInstance(groupBySetSize);

int count = 0;

for (String groupBy : groupBySet) {

count = setTupleField(inputMap, tuple, count, groupBy);

}

return new MonitoringTuple(tuple);

}

}


> public static int setTupleField(Map inputMap, Tuple
> tuple, int count, String groupBy) {

Object groupByValueObj = inputMap.get(groupBy);

String tupleValue = Utils.convertToString(groupByValueObj);

tuple.setField(tupleValue, count++);

return count;

}

}





TIA,

On Wed, May 1, 2019 at 1:39 PM Vijay Balakrishnan 
wrote:

> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream> kinesisStream = ...;
> KeyedStream, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
> about Tuple type for monitoringTupleKeyedStream
> .
>
> public static class MapTupleKeySelector implements KeySelector Object>, Tuple> {
> private final Set groupBySet;
>
> public MapTupleKeySelector(Set groupBySet) {
> this.groupBySet = groupBySet;
> }
>
> @Override
> public Tuple getKey(Map inputMap) throws Exception
> {
> int groupBySetSize = groupBySet.size();
> Tuple tuple = Tuple.newInstance(groupBySetSize);
> //Tuple1 tuple = new Tuple1();
> int count = 0;
> for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
> }
> return tuple;
> }
> }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema> to convert
> to a DataStream> kinesisStream.
>
> TIA,
>
> On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma 
> wrote:
>
>> I agree with Timothy, POJO would be a much better approach.
>>
>> However, If you are trying to build some generic framework and for
>> different streams, there would be different fields, you can follow the Map
>> approach. For the latter approach, you need to write extra mapper class
>> which will convert all the fields in the stream to the Map based stream.
>>
>> Abhishek
>>
>> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor  wrote:
>>
>>> Could this just be solved by creating a POJO model class for your
>>> problem?
>>>
>>> That is, instead of using Tuple6 - create a class that encapsulates your
>>> data.   This, I think, would solve your problem.  But beyond that I think
>>> the code will be more understandable.  It's hard to have a Tuple6 of all
>>> Strings, and remember what each one means -- even if I wrote the code :-)
>>> Furthermore, if and when you need to add more elements to your data model,
>>> you will need to refactor your entire Flink graph.   Keeping a data model
>>> in POJO pr

Re: Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

2019-05-09 Thread Vijay Balakrishnan
Hi Chesnay,
Sorry for causing the confusion. I solved the problem by following another
person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
> private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector Object>, MonitoringTuple> {

private final Set groupBySet;


> public MapTupleKeySelector(Set groupBySet) {

this.groupBySet = groupBySet;

}


> @Override

public MonitoringTuple getKey(Map inputMap) {

int groupBySetSize = groupBySet.size();

Tuple tuple = Tuple.newInstance(groupBySetSize);

int count = 0;

for (String groupBy : groupBySet) {

count = setTupleField(inputMap, tuple, count, groupBy);

}

return new MonitoringTuple(tuple);

}

}


> public static int setTupleField(Map inputMap, Tuple
> tuple, int count, String groupBy) {

Object groupByValueObj = inputMap.get(groupBy);

String tupleValue = Utils.convertToString(groupByValueObj);

tuple.setField(tupleValue, count++);

return count;

}

}





TIA,

On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler  wrote:

> I'm not sure what you're asking.
>
> If you have a Deserialization schema that convert the data into a Map
> you're done as I understand it, what do you believe to be missing?
>
> If, for a given job, the number/types of fields are fixed you could look
> into using Row.
>
> On 01/05/2019 22:40, Vijay Balakrishnan wrote:
>
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream> kinesisStream = ...;
> KeyedStream, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
> about Tuple type for monitoringTupleKeyedStream
> .
>
> public static class MapTupleKeySelector implements KeySelector Object>, Tuple> {
> private final Set groupBySet;
>
> public MapTupleKeySelector(Set groupBySet) {
> this.groupBySet = groupBySet;
> }
>
> @Override
> public Tuple getKey(Map inputMap) throws Exception
> {
> int groupBySetSize = groupBySet.size();
> Tuple tuple = Tuple.newInstance(groupBySetSize);
> //Tuple1 tuple = new Tuple1();
> int count = 0;
> for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
> }
> return tuple;
> }
> }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema> to convert
> to a DataStream> kinesisStream.
>
> TIA,
>
>
>


Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

2019-05-01 Thread Vijay Balakrishnan
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception
trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
etc.
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead.

DataStream> kinesisStream = ...;
KeyedStream, Tuple> monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
about Tuple type for monitoringTupleKeyedStream
.

public static class MapTupleKeySelector implements KeySelector, Tuple> {
private final Set groupBySet;

public MapTupleKeySelector(Set groupBySet) {
this.groupBySet = groupBySet;
}

@Override
public Tuple getKey(Map inputMap) throws Exception {
int groupBySetSize = groupBySet.size();
Tuple tuple = Tuple.newInstance(groupBySetSize);
//Tuple1 tuple = new Tuple1();
int count = 0;
for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
}
return tuple;
}
}

Abhishek had replied back in the Thread as follows: (posting in that thread
as well creating a new thread):
However, If you are trying to build some generic framework and for
different streams, there would be different fields, you can follow the Map
approach. For the latter approach, you need to write extra mapper class
which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by
implementing KinesisDeserializationSchema> to convert
to a DataStream> kinesisStream.

TIA,


Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-05-01 Thread Vijay Balakrishnan
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception
trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
etc.
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead.

DataStream> kinesisStream = ...;
KeyedStream, Tuple> monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<= complains
about Tuple type for monitoringTupleKeyedStream
.

public static class MapTupleKeySelector implements KeySelector, Tuple> {
private final Set groupBySet;

public MapTupleKeySelector(Set groupBySet) {
this.groupBySet = groupBySet;
}

@Override
public Tuple getKey(Map inputMap) throws Exception {
int groupBySetSize = groupBySet.size();
Tuple tuple = Tuple.newInstance(groupBySetSize);
//Tuple1 tuple = new Tuple1();
int count = 0;
for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
}
return tuple;
}
}

Abhishek had replied back in the Thread as follows: (posting in that thread
as well creating a new thread):
However, If you are trying to build some generic framework and for
different streams, there would be different fields, you can follow the Map
approach. For the latter approach, you need to write extra mapper class
which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by
implementing KinesisDeserializationSchema> to convert
to a DataStream> kinesisStream.

TIA,

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma 
wrote:

> I agree with Timothy, POJO would be a much better approach.
>
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Abhishek
>
> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor  wrote:
>
>> Could this just be solved by creating a POJO model class for your problem?
>>
>> That is, instead of using Tuple6 - create a class that encapsulates your
>> data.   This, I think, would solve your problem.  But beyond that I think
>> the code will be more understandable.  It's hard to have a Tuple6 of all
>> Strings, and remember what each one means -- even if I wrote the code :-)
>> Furthermore, if and when you need to add more elements to your data model,
>> you will need to refactor your entire Flink graph.   Keeping a data model
>> in POJO protects against those things.
>>
>> The latter is just unsolicited code review feedback.   And I know I gave
>> it without much context to your problem.  So please take with a large grain
>> of salt, and if it doesn't apply just ignore it.
>>
>> Tim
>>
>>
>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler 
>> wrote:
>>
>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>
>>> What was the result of this approach?
>>>
>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>
>>> Hi Tim,
>>> Thanks for your reply. I am not seeing an option to specify a
>>> .returns(new TypeHint>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>
>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector() {
>>>> public Tuple getKey(Monitoring mon) throws Exception 
>>>> {..return
>>>> new Tuple6<>(..}})
>>>
>>> I tried using
>>> TypeInformation>
>>> info = TypeInformation.of(new TypeHint>> String, String, String>>(){});
>>>
>>>> kinesisStream.keyBy(new KeySelector() {...}, info);
>>>> //specify typeInfo through
>>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor  wrote:
>>>
>>>> Flink needs type information for serializing and deserializing objects,
>>>> and that is lost due to Java type erasure.   The only way to workaround
>>>> this is to specify the return type of the function called in the lambda.
>>>>
>>>> Fabian's answer here explains it well.
>>>>
>>>>
>>>> https://stackoverflow.com/

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-11 Thread Vijay Balakrishnan
Thx for all your replies. Solved the problem by skirting the issue. I
pre-populated the incoming Monitoring Object on intake with the dynamic
runtime fields keyName and keyValue and that way, I could use the static
call as used in all the other if conditions:
  monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId",
"eventName", "component", "keyName","keyValue");

The reason, I want to use Tuple was because I was passing this
KeyedStream to a common method that could handle the Tuple accordingly.

I tried using  [ keyBy(KeySelector, TypeInformation) ] but the compiler
complained that I need to use Monitoring, Tuple6 in that particular case.

Vijay

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma 
wrote:

> I agree with Timothy, POJO would be a much better approach.
>
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Abhishek
>
> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor  wrote:
>
>> Could this just be solved by creating a POJO model class for your problem?
>>
>> That is, instead of using Tuple6 - create a class that encapsulates your
>> data.   This, I think, would solve your problem.  But beyond that I think
>> the code will be more understandable.  It's hard to have a Tuple6 of all
>> Strings, and remember what each one means -- even if I wrote the code :-)
>> Furthermore, if and when you need to add more elements to your data model,
>> you will need to refactor your entire Flink graph.   Keeping a data model
>> in POJO protects against those things.
>>
>> The latter is just unsolicited code review feedback.   And I know I gave
>> it without much context to your problem.  So please take with a large grain
>> of salt, and if it doesn't apply just ignore it.
>>
>> Tim
>>
>>
>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler 
>> wrote:
>>
>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>
>>> What was the result of this approach?
>>>
>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>
>>> Hi Tim,
>>> Thanks for your reply. I am not seeing an option to specify a
>>> .returns(new TypeHint>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>
>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector() {
>>>> public Tuple getKey(Monitoring mon) throws Exception 
>>>> {..return
>>>> new Tuple6<>(..}})
>>>
>>> I tried using
>>> TypeInformation>
>>> info = TypeInformation.of(new TypeHint>> String, String, String>>(){});
>>>
>>>> kinesisStream.keyBy(new KeySelector() {...}, info);
>>>> //specify typeInfo through
>>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor  wrote:
>>>
>>>> Flink needs type information for serializing and deserializing objects,
>>>> and that is lost due to Java type erasure.   The only way to workaround
>>>> this is to specify the return type of the function called in the lambda.
>>>>
>>>> Fabian's answer here explains it well.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>>>>
>>>> Tim
>>>>
>>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to use the KeyedStream with Tuple to handle diffrent types
>>>>> of Tuples including Tuple6.
>>>>> Keep getting the Exception:
>>>>> *Exception in thread "main"
>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of 
>>>>> class
>>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>>>>> Tuple2, etc.) instead*.
>>>>> Is there a way around Type Erasure here ?
>>>>> I want to use KeyedStream so that I can pass it on
>>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>>>>
>>>>> Code below:
>>>>>
>>>>> KeyedStream monitoringTupleKeyedStream = null;
>&g

Re: Timestamp Watermark Assigner bpund question

2019-04-10 Thread Vijay Balakrishnan
Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current
Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time
< current Watermark < currMaxTimestamp ??? Then the Operator progresses to
the next Watermark as a starting point for events after event time reaches
currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.

// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;


How does it guarantee that watermark never goes backwards ?

TIA,

Vijay



On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma  wrote:

> Hi,
> 1. From doc[1], A Watermark(t) declares that event time has reached time t
> in that stream, meaning that there should be no more elements from the
> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
> to the watermark). So I think it might be counterintuitive that generating
> a watermark, which is bigger than the timestamp of current element. At
> least you should minus the bound.
> 2. From the definition of watermark I think that watermark is not related
> with the length of window. The bound is related to your application.
> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
> choice. Watermark is not free, you might send too many watermarks. If your
> source could generate some "watermark" element I think you could use the
> interface. You could choose AssignerWithPeriodicWatermarks. You can find
> the example from doc[2].
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
> 2.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
> Best,
> Guowei
>
>
> Vijay Balakrishnan  于2019年4月10日周三 上午7:41写道:
>
>> Hi,
>> I have created a TimestampAssigner as follows.
>> I want to use monitoring.getEventTimestamp() with an Event Time
>> processing and collected aggregated stats over time window intervals of 5
>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>> with a bound ? I want to collect the stats for each eventTimestamp + window
>> intervals. My understanding - *the generated watermark which is
>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>> Or does this bound have to be based on the windowInterval i.e
>> extractedTimestamp + windowInterval + bound *??
>>
>>
>>> *public class MonitoringTSWAssigner implements
>>> AssignerWithPunctuatedWatermarks {*
>>> * private long bound = 5 * (long) 1000; *
>>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
>>> *return monitoring.getEventTimestamp();**}*
>>>
>>> *public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>> long extractedTimestamp) {*
>>> *return new Watermark(extractedTimestamp + bound);//< should
>>> it be - bound ?*
>>> *}**}*
>>
>>
>> Used here:
>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> final DataStreamSource monitoringDataStreamSource =
>>> env.addSource();
>>> DataStream kinesisStream =
>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>>> MonitoringTSWAssigner());
>>> KeyedStream monitoringTupleKeyedStream =
>>> kinesisStream.keyBy("deployment", .);
>>> final WindowedStream windowStream =
>>>
>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>
>>
>> TIA,
>>
>


Timestamp Watermark Assigner bpund question

2019-04-09 Thread Vijay Balakrishnan
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing
and collected aggregated stats over time window intervals of 5 secs, 5 mins
etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
? I want to collect the stats for each eventTimestamp + window intervals.
My understanding - *the generated watermark which is eventTimestamp + bound
will collect all the eventTimestamp's which arrive within that Watermark
inside each eventTimestamp + 5s etc window interval. Or does this bound
have to be based on the windowInterval i.e extractedTimestamp +
windowInterval + bound *??


> *public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks {*
> * private long bound = 5 * (long) 1000; *
> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
> *return monitoring.getEventTimestamp();**}*
>
> *public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {*
> *return new Watermark(extractedTimestamp + bound);//< should
> it be - bound ?*
> *}**}*


Used here:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final DataStreamSource monitoringDataStreamSource =
> env.addSource();
> DataStream kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> MonitoringTSWAssigner());
> KeyedStream monitoringTupleKeyedStream =
> kinesisStream.keyBy("deployment", .);
> final WindowedStream windowStream =
> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5
> sec time window


TIA,


Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-03 Thread Vijay Balakrishnan
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new
TypeHint>(){}) with
KeyedStream ??

> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector() {
> public Tuple getKey(Monitoring mon) throws Exception {..return
> new Tuple6<>(..}})

I tried using
TypeInformation>
info = TypeInformation.of(new TypeHint>(){});

> kinesisStream.keyBy(new KeySelector() {...}, info);
> //specify typeInfo through
>

TIA,
Vijay

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor  wrote:

> Flink needs type information for serializing and deserializing objects,
> and that is lost due to Java type erasure.   The only way to workaround
> this is to specify the return type of the function called in the lambda.
>
> Fabian's answer here explains it well.
>
>
> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>
> Tim
>
> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I am trying to use the KeyedStream with Tuple to handle diffrent types of
>> Tuples including Tuple6.
>> Keep getting the Exception:
>> *Exception in thread "main"
>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>> Tuple2, etc.) instead*.
>> Is there a way around Type Erasure here ?
>> I want to use KeyedStream so that I can pass it on to
>> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>
>> Code below:
>>
>> KeyedStream monitoringTupleKeyedStream = null;
>>> String keyOperationType = ;//provided
>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component");
>>> } else if
>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component", "instance");
>>> } else if
>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>> TypeInformation>> String>> info = TypeInformation.of(new TypeHint>> String, String, String, String>>(){});
>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>> KeySelector() {
>>> public Tuple getKey(Monitoring mon) throws Exception {
>>> String key = "";
>>> String keyName = "";
>>> final String eventName = mon.getEventName();
>>> if (eventName != null &&
>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>> )) {
>>> keyName = PCAM_ID;
>>> key = mon.getEventDataMap() != null ? (String)
>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>> } else if (eventName != null &&
>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>> keyName = OUT_BITRATE;
>>> key = mon.getEventDataMap() != null ? (String)
>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>> }
>>> mon.setKeyName(keyName);
>>> mon.setKeyValue(key);
>>> return new Tuple6<>(mon.getDeployment(),
>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>> mon.getKeyValue());
>>> }
>>> }); //, info)
>>> } else if
>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component", "instance", "container"); //<== this is
>>> also a Tuple6 but no complaints ?
>>> }
>>> }
>>
>>
>>
>> This example below needs monitoringTupleKeyedStream  to be
>> KeyedStream> String>>
>>
>>> TypeInformation>
>>> info = TypeInformation.of(new TypeHint>> String, String, String>>(){});
>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>> KeySelector>> St

Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-02 Thread Vijay Balakrishnan
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of
Tuples including Tuple6.
Keep getting the Exception:
*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead*.
Is there a way around Type Erasure here ?
I want to use KeyedStream so that I can pass it on to
treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream monitoringTupleKeyedStream = null;
> String keyOperationType = ;//provided
> if (StringUtils.isNotEmpty(keyOperationType)) {
> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component");
> } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance");
> } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
> TypeInformation String>> info = TypeInformation.of(new TypeHint String, String, String, String>>(){});
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector() {
> public Tuple getKey(Monitoring mon) throws Exception {
> String key = "";
> String keyName = "";
> final String eventName = mon.getEventName();
> if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
> )) {
> keyName = PCAM_ID;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
> } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
> keyName = OUT_BITRATE;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
> }
> mon.setKeyName(keyName);
> mon.setKeyValue(key);
> return new Tuple6<>(mon.getDeployment(), mon.getGameId(),
> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
> }
> }); //, info)
> } else if
> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance", "container"); //<== this is
> also a Tuple6 but no complaints ?
> }
> }



This example below needs monitoringTupleKeyedStream  to be
KeyedStream>

> TypeInformation>
> info = TypeInformation.of(new TypeHint String, String, String>>(){});
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector String>>() {
> @Override
> public Tuple6 String> getKey(Monitoring mon) throws Exception {
> String key = "";
> String keyName = "";
> //TODO: extract to a method to pull key to use
> from a config file
> final String eventName = mon.getEventName();
> if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
> )) {
> keyName = PCAM_ID;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
> } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
> keyName = OUT_BITRATE;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
> }
> mon.setKeyName(keyName);
> mon.setKeyValue(key);
> return new Tuple6<>(mon.getDeployment(),
> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
> mon.getKeyValue());
> }
> }, info);


TIA


Re: Watermark not firing to push data

2018-12-18 Thread Vijay Balakrishnan
Hi,
After looking at the code in EventTimeTrigger, I changed the Watermark to
be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS
was <= watermark. I was able to consumer from Kinesis when I had only 50
records.

For TumblingWindow of 5 secs , the window maxTS was usually like around
currTime + 5 secs.
So, I set the watermark to System.currentMillisecs + 5 secs.
This way, the trigger fired and got into the AggregateFunction.getResult().

@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This
check had to be met
  // if the watermark is already past the window fire immediately
  return TriggerResult.FIRE;
   } else {
  ctx.registerEventTimeTimer(window.maxTimestamp());
  return TriggerResult.CONTINUE;
   }
}


On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan 
wrote:

> Hi,
> Thx for your reply and pointers on the currentLowWatermark. Looks like the
> Flink UI has tab for Watermarks itself for an Operator.
>
> I dump 5 records into the Kinesis Data Stream and am trying to read the
> same record from the FlinkKinesisConsumer and am not able to.
> I am using the same monitoring.getIntervalStart() in the Watermark
> generation(intervalStart - bound) in *MonitoringAssigner* class that I
> used to generate data on the producer side. I generate intervalStart on the
> Producer side which increments on each record by 3-10 millisecs. The
> watermark is being generated with intervalStart - bound(3 secs)-so, every
> watermark generated is > than the previous one. So, why does it not push
> data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
> never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
> works when i produce 1000 records or so into Kinesis data stream.
>
> Here is a gist of my code-
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //FlinkConsumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "1");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");//2000
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
> FlinkKinesisConsumer kinesisConsumer = FlinkKinesisConsumer<>(
> kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);
> final DataStreamSource monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> DataStream kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> *MonitoringAssigner*(3000));//code at bottom
>
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> Time.seconds(5);
> final WindowedStream windowStream =
> kinesisStream.timeWindow(timeWindow);
> DataStream enrichedComponentInstanceStream1 =
> windowStream.*aggregate*(
> new *MGroupingWindowAggregate*(),//AggregateFunction
> impl
> new *MGroupingAggregateWindowProcessing*(...));
>
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks {
> private long *bound = 3 * 1000*;//3 secs out of order bound in
> millisecs
> public MonitoringAssigner(long bound) {
> this.bound = bound;
> }
> public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
> long nextWatermark = extractedTimestamp - bound;
> return new Watermark(nextWatermark);
> }
> public long extractTimestamp(Monitoring monitoring, long previousTS) {
> LocalDateTime intervalStart = Utils.getLocalDateTime(
> *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
> long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
> return extractedTS;
> //return System.currentTimeMillis(); //this works fine.
> }
> }
>
> TIA,
> Vijay
>
> On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng  wrote:
>
>> Hi Vijay,
>>
>> Could you provide more information about your problem? For example
>> - Which kind of window do you use?
>> - What's the window size?
>> - A relatively complete code is better :-)
>>
>> As for the problem, it is probably the event time has not reached the end
>> of the window. You can monitor the watermark in the web dashboard[1].
>> Also, changing even time to processing time is another way to verify if
>&g

Re: Watermark not firing to push data

2018-12-17 Thread Vijay Balakrishnan
Hi,
Thx for your reply and pointers on the currentLowWatermark. Looks like the
Flink UI has tab for Watermarks itself for an Operator.

I dump 5 records into the Kinesis Data Stream and am trying to read the
same record from the FlinkKinesisConsumer and am not able to.
I am using the same monitoring.getIntervalStart() in the Watermark
generation(intervalStart - bound) in *MonitoringAssigner* class that I used
to generate data on the producer side. I generate intervalStart on the
Producer side which increments on each record by 3-10 millisecs. The
watermark is being generated with intervalStart - bound(3 secs)-so, every
watermark generated is > than the previous one. So, why does it not push
data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
works when i produce 1000 records or so into Kinesis data stream.

Here is a gist of my code-
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//FlinkConsumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"1");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");//2000
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
FlinkKinesisConsumer kinesisConsumer = FlinkKinesisConsumer<>(
kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);
final DataStreamSource monitoringDataStreamSource =
env.addSource(kinesisConsumer);
DataStream kinesisStream =
monitoringDataStreamSource.assignTimestampsAndWatermarks(new
*MonitoringAssigner*(3000));//code at bottom

org.apache.flink.streaming.api.windowing.time.Time timeWindow =
Time.seconds(5);
final WindowedStream windowStream =
kinesisStream.timeWindow(timeWindow);
DataStream enrichedComponentInstanceStream1 =
windowStream.*aggregate*(
new *MGroupingWindowAggregate*(),//AggregateFunction
impl
new *MGroupingAggregateWindowProcessing*(...));

public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks {
private long *bound = 3 * 1000*;//3 secs out of order bound in millisecs
public MonitoringAssigner(long bound) {
this.bound = bound;
}
public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
long nextWatermark = extractedTimestamp - bound;
return new Watermark(nextWatermark);
}
public long extractTimestamp(Monitoring monitoring, long previousTS) {
LocalDateTime intervalStart = Utils.getLocalDateTime(
*monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
return extractedTS;
//return System.currentTimeMillis(); //this works fine.
}
}

TIA,
Vijay

On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng  wrote:

> Hi Vijay,
>
> Could you provide more information about your problem? For example
> - Which kind of window do you use?
> - What's the window size?
> - A relatively complete code is better :-)
>
> As for the problem, it is probably the event time has not reached the end
> of the window. You can monitor the watermark in the web dashboard[1].
> Also, changing even time to processing time is another way to verify if it
> is a watermark problem.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>
>
> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Observations on Watermarks:
>> Read this great article:
>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>>
>> * Watermark means when for any event TS, when to stop waiting for arrival
>> of earlier events.
>> * Watermark t means all events with Timestamp < t have already arrived.
>> * When to push data out - When watermark with TS >= t arrives
>>
>> Only *using incrementing current time for watermark seems to be working
>> correctly* but not sure if it aligns up correctly with EventTime
>> processing.
>> *Using the incoming records intervalStart as the Watermark source  for
>> EventTime causes data to not be pushed at all* in cases when i have just
>> 5 records in the Source.
>>
>> My source generation for intervalStart has intervalStart incrementing at
>> a regular interval.
>> I tried using the intervalStart for my Watermark with a out of order late
>> boundedness of 3 secs.
>> The *AggregateFunction* I am using calls the add() fine

Watermark not firing to push data

2018-12-14 Thread Vijay Balakrishnan
Hi,
Observations on Watermarks:
Read this great article:
https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy

* Watermark means when for any event TS, when to stop waiting for arrival
of earlier events.
* Watermark t means all events with Timestamp < t have already arrived.
* When to push data out - When watermark with TS >= t arrives

Only *using incrementing current time for watermark seems to be working
correctly* but not sure if it aligns up correctly with EventTime processing.
*Using the incoming records intervalStart as the Watermark source  for
EventTime causes data to not be pushed at all* in cases when i have just 5
records in the Source.

My source generation for intervalStart has intervalStart incrementing at a
regular interval.
I tried using the intervalStart for my Watermark with a out of order late
boundedness of 3 secs.
The *AggregateFunction* I am using calls the add() fine but *never calls
the getResult().*
My assumption was that the AggregateFunction I am using would push the data
to getResult
based on the Watermark based on intervalStart incrementing beyong the
previous watermark t.
But it doesn't -is it because I have limited number of input records and
once intervalStart gets to the end
of the input records too fast, it stops incrementing the watermar and hence
doesn't push data ?

With System.currentTimeMillis, it happily keeps increasing and hence pushes
the data.

Created this class:
public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks {
private long bound = 3 * 1000;//3 secs out of order bound in millisecs

public MonitoringAssigner(long bound) {
this.bound = bound;
}
public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
long nextWatermark = extractedTimestamp - bound;
//simply emit a Watermark with every event
return new Watermark(nextWatermark);
}

@Override
public long extractTimestamp(Monitoring monitoring, long previousTS) {
/*LocalDateTime intervalStart =
Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
02:21:06.057
long extractedTS =
Utils.getLongFromLocalDateTime(intervalStart);//*using
this stopped pushing recs after a certain time*
return extractedTS;*/
return *System.currentTimeMillis*();//incrementing current time

}


Re: Encountered the following Expired Iterator exception in getRecords using FlinkKinesisConsumer

2018-12-14 Thread Vijay Balakrishnan
I have 2 shards in the Kinesis Streams- need to figure out how to check
from the logs if records are being written to both shards .
Not sure if this is what you are looking for in terms of # of shards read-
seems like 1 from the logs below:

DEBUG org.apache.http.wire [] -
http-outgoing-12 <<
"{"took":2,"errors":false,"items":[{"index":{"_index":"mlist-5sec-comp-idx","_type":"mlist_5sec_comp_schema","_id":"66zEqWcBmN9Gpk7gdac7","_version":1,"result":"created","_
*shard*
s":{"total":2,"successful":1,"failed":0},"_seq_no":34,"_primary_term":1,"status":201}}]}"

14:51:23,842 [I/O dispatcher 36] DEBUG org.apache.http.wire
[] - http-outgoing-9 <<
"{"took":1,"errors":false,"items":[{"index":{"_index":"mlist-5sec-inst-idx","_type":"mlist_5sec_inst_schema","_id":"7KzEqWcBmN9Gpk7gdadA","_version":1,"result":"created","_
*shard*
s":{"total":2,"successful":1,"failed":0},"_seq_no":43,"_primary_term":1,"status":201}}]}"


On Fri, Dec 14, 2018 at 12:28 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I’m suspecting that this is the issue:
> https://issues.apache.org/jira/browse/FLINK-11164.
>
> One more thing to clarify to be sure of this:
> Do you have multiple shards in the Kinesis stream, and if yes, are some of
> them actually empty?
> Meaning that, even though you mentioned some records were written to the
> Kinesis stream, some shards actually weren’t written any records.
>
> Cheers,
> Gordon
>
>
> On 14 December 2018 at 4:10:30 AM, Vijay Balakrishnan (bvija...@gmail.com)
> wrote:
>
> Hi Gordon,
>
> My use-case was slightly different.
>
> 1.  Started a Kinesis connector source, with TRIM_HORIZON as the startup
> position.
> 2. Only a few Records were written to the Kinesis stream
> 3. The FlinkKinesisConsumer reads the records from Kinesis stream. Then
> after a period of time of not reading anymore Kinesis Stream records, it 
> received
> the “Encountered an unexpected expired iterator” warning in the logs, and
> the job failed with the misleading AmazonKinesisException?
>
> Also, in 1 with LATEST  as the startup position, I have not been able to
> read any records from the Kinesis Stream.Still trying to pinpoint what i am
> doing wrong. For sure, I am not using checkpoints and not sure if this
> causes any issues with LATEST option.
> TIA,
> Vijay
>
> On Thu, Dec 13, 2018 at 2:59 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi!
>>
>> Thanks for reporting this.
>>
>> This looks like an overlooked corner case that the Kinesis connector
>> doesn’t handle properly.
>>
>> First, let me clarify the case and how it can be reproduced. Please let
>> me know if the following is correct:
>> 1. You started a Kinesis connector source, with TRIM_HORIZON as the
>> startup position.
>> 2. No records were written to the Kinesis stream at all.
>> 3. After a period of time, you received the “Encountered an unexpected
>> expired iterator” warning in the logs, and the job failed with the
>> misleading AmazonKinesisException?
>>
>> Cheers,
>> Gordon
>>
>> On 13 December 2018 at 6:53:11 AM, Vijay Balakrishnan (bvija...@gmail.com)
>> wrote:
>>
>> Hi,
>> Using FlinkKinesisConsumer in a long running Flink Streaming app
>> consuming from a Kinesis Stream.
>> Encountered the following Expired Iterator exception in getRecords():
>>  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer []
>> - Encountered an unexpected expired iterator
>>
>> The error on the console ends up being a misleading one: "Caused by:
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
>> 1 validation error detected: Value 'EARLIEST_SEQUENCE_NUM' at
>> 'startingSequenceNumber' failed to satisfy constraint: Member must satisfy
>> regular expression pattern: 0|([1-9]\d{0,128}) (Service: AmazonKinesis;
>> Status Code: 400; Error Code: ValidationException; Request ID: ..)
>> "
>>
>> How do I increase the *ClientConfiguration.clientExecutiontimeout* to
>> avoid this issue or is this the right way to handle this issue ? I don't
>> want the FlinkKinesisConsumer streaming app to fail just because there
>> might be no records in the Kinesis Stream. I am using TRIM_HORIZON to read
>> from the start of the Kinesis Stream.
>>
>>  TIA,
>>
>>


Re: Encountered the following Expired Iterator exception in getRecords using FlinkKinesisConsumer

2018-12-13 Thread Vijay Balakrishnan
Hi Gordon,

My use-case was slightly different.

1.  Started a Kinesis connector source, with TRIM_HORIZON as the startup
position.
2. Only a few Records were written to the Kinesis stream
3. The FlinkKinesisConsumer reads the records from Kinesis stream. Then
after a period of time of not reading anymore Kinesis Stream records,
it received
the “Encountered an unexpected expired iterator” warning in the logs, and
the job failed with the misleading AmazonKinesisException?

Also, in 1 with LATEST  as the startup position, I have not been able to
read any records from the Kinesis Stream.Still trying to pinpoint what i am
doing wrong. For sure, I am not using checkpoints and not sure if this
causes any issues with LATEST option.
TIA,
Vijay

On Thu, Dec 13, 2018 at 2:59 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Thanks for reporting this.
>
> This looks like an overlooked corner case that the Kinesis connector
> doesn’t handle properly.
>
> First, let me clarify the case and how it can be reproduced. Please let me
> know if the following is correct:
> 1. You started a Kinesis connector source, with TRIM_HORIZON as the
> startup position.
> 2. No records were written to the Kinesis stream at all.
> 3. After a period of time, you received the “Encountered an unexpected
> expired iterator” warning in the logs, and the job failed with the
> misleading AmazonKinesisException?
>
> Cheers,
> Gordon
>
> On 13 December 2018 at 6:53:11 AM, Vijay Balakrishnan (bvija...@gmail.com)
> wrote:
>
> Hi,
> Using FlinkKinesisConsumer in a long running Flink Streaming app consuming
> from a Kinesis Stream.
> Encountered the following Expired Iterator exception in getRecords():
>  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer []
> - Encountered an unexpected expired iterator
>
> The error on the console ends up being a misleading one: "Caused by:
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
> 1 validation error detected: Value 'EARLIEST_SEQUENCE_NUM' at
> 'startingSequenceNumber' failed to satisfy constraint: Member must satisfy
> regular expression pattern: 0|([1-9]\d{0,128}) (Service: AmazonKinesis;
> Status Code: 400; Error Code: ValidationException; Request ID: ..)
> "
>
> How do I increase the *ClientConfiguration.clientExecutiontimeout* to
> avoid this issue or is this the right way to handle this issue ? I don't
> want the FlinkKinesisConsumer streaming app to fail just because there
> might be no records in the Kinesis Stream. I am using TRIM_HORIZON to read
> from the start of the Kinesis Stream.
>
>  TIA,
>
>


Re: Using FlinkKinesisConsumer through a proxy

2018-11-30 Thread Vijay Balakrishnan
Hi Gordon,
Finally figured out my issue.Do not need to add http:// in proxyHost name.
String proxyHost= "proxy-chaincom";//not http://proxy-chain...com
kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyHost", proxyHost);//<== mo http:// in proxyHost name

TIA,
Vijay


On Wed, Nov 14, 2018 at 12:50 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Vijay,
>
> I’m pretty sure that this should work with the properties that you
> provided, unless the AWS Kinesis SDK isn’t working as expected.
>
> What I’ve tested is that with those properties, the ClientConfiguration
> used to build the Kinesis client has the proxy domain / host / ports etc.
> properly set.
> And according to [1], this should be enough to configure the constructed
> Kinesis client to connect via the proxy.
>
> Cheers,
> Gordon
>
> [1]
> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/section-client-configuration.html
>
>
> On 7 November 2018 at 1:19:02 AM, Vijay Balakrishnan (bvija...@gmail.com)
> wrote:
>
> Hi Gordon,
> This still didn't work :(
>
> Tried a few combinations with:
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyDomain", "...");
>
> inesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyHost", "http://.com;);
>
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyPort", "911");
>
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyUsername", "...");
>
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyPassword", "..");
>
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "nonProxyHosts", "
>
>
> How does the FlinkKinesisProducer work so seamlessly through a proxy ?
> TIA,
> Vijay
>
> On Thu, Oct 4, 2018 at 6:41 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> Since Flink 1.5, you should be able to set all available configurations
>> on the ClientConfiguration through the consumer Properties (see FLINK-9188
>> [1]).
>>
>> The way to do that would be to prefix the configuration you want to set
>> with "aws.clientconfig" and add that to the properties, as such:
>>
>> ```
>> Properties kinesisConsumerProps = new Properties();
>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
>> ...
>> ```
>>
>> Could you try that out and see if it works for you?
>>
>> I've also realized that this feature isn't documented very well, and have
>> opened a ticket for that [2].
>>
>> Cheers,
>> Gordon
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9188
>> [2] https://issues.apache.org/jira/browse/FLINK-10492
>>
>> On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm looping in Gordon and Thomas, they might have some idea about how to
>>> resolve this.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:
>>>
>>> I have been trying with all variations  to no avail of java
>>> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://...
>>> -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=..
>>> -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=...
>>> -Dhttp.proxyPassword=... -jar .. after looking at the code in
>>> com.amazonaws.ClientConfiguration
>>>
>>> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
>>> wrote:
>>>
>>>> HI,
>>>> How do I use FlinkKinesisConsumer using the Properties through a proxy
>>>> ? Getting a Connection issue through the proxy.
>>>> Works outside the proxy.
>>>>
>>>> Properties kinesisConsumerConfig = new Properties();
>>>>
>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
>>>>
>>>> if (local) {
>>>>
>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>>>> accessKey);
>>>>
>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>>>> secretKey);
>>>> } else {
>>>>
>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>>>> "AUTO");
>>>> }
>>>>
>>>> //only for Consumer
>>>>
>>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>>>> "1");
>>>>
>>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>>>> "2000");
>>>> FlinkKinesisConsumer>
>>>> kinesisConsumer = new FlinkKinesisConsumer<>(
>>>> "kinesisTopicRead", new Tuple2KinesisSchema(),
>>>> kinesisConsumerConfig);
>>>> TIA
>>>>
>>>
>>>


Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi Gary,
Bang on the money. I did not have an assigned Watermark and once I put that
in, the code entered the process() method.
Thx a ton for your help.Life-saver

DataStream kinesisStream = env
.addSource(kinesisConsumer)
.assignTimestampsAndWatermarks(new MonitoringAssigner())//<=



On Fri, Nov 9, 2018 at 10:02 AM Gary Yao  wrote:

> Hi,
>
> You are using event time but are you assigning watermarks [1]? I do not
> see it
> in the code.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Any help is appreciated.Dug into this. *I can see the deserialized
>> output log from FlinkKinesisConsumer deserialization but it keeps looping
>> to pull from Kinesis Stream but never gets into the Windowing operation for
>> process() or apply().*
>>
>> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
>> and the deserialized output never seems to get into the apply() or
>> process() method of a Windowing operation. I can see the logs of
>> MonitoringMapKinesisSchema deserializing data back successfully from
>> Kinesis and converting into a POJO.
>>
>> Code:
>>
>> *//Create environment*:
>> StreamExecutionEnvironment env;
>> if (local) {
>> Configuration configuration = new Configuration();
>> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
>> env = StreamExecutionEnvironment.createLocalEnvironment(1,
>> configuration);
>> } else {
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> }
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> *//create FlinkKinesisConsumer*
>> Properties kinesisConsumerConfig = new Properties();
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>> "1");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>> "2000");
>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>> "TRIM_HORIZON");
>> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
>> kinesisTopicRead, new MonitoringMapKinesisSchema(),
>> kinesisConsumerConfig);*//deserialization works fine*
>> DataStream kinesisStream = env
>> .addSource(kinesisConsumer);
>> KeyedStream>
>> enrichedComponentInstanceStream1Key = kinesisStream
>> .keyBy(new KeySelector> String>>() {
>> public Tuple3
>> getKey(Monitoring mon) throws Exception {
>> return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
>> }
>> });
>>
>> WindowedStream, TimeWindow>
>> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>>
>> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>>
>> DataStream enrichedComponentInstanceStream1 =
>> enrichedComponentInstanceStream1Win
>> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))
>> .process(new Window5SecProcessing());*//never gets in
>> here*
>> //Gets into Window5SecProcessing.open() method during initialization but
>> never into the process method 
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction> String, String>, TimeWindow> {
>>
>> private transient String interval;
>> private transient String gameId;
>> private transient String keyType;
>> private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>>
>> private transient ValueState total5SecCountState;
>> private transient ValueStateDescriptor
>> total5SecCountValueStateDescriptor;
>> public Window5SecProcessing() {
>>
>> }
>>
>> public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>> this.gameId = gameId;
>> this.interval = interval;
>> this.keyType = keyType;
>> }
>>
>> @Override
>> public

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi,
Any help is appreciated.Dug into this. *I can see the deserialized output
log from FlinkKinesisConsumer deserialization but it keeps looping to pull
from Kinesis Stream but never gets into the Windowing operation for
process() or apply().*

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
and the deserialized output never seems to get into the apply() or
process() method of a Windowing operation. I can see the logs of
MonitoringMapKinesisSchema deserializing data back successfully from
Kinesis and converting into a POJO.

Code:

*//Create environment*:
StreamExecutionEnvironment env;
if (local) {
Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
env = StreamExecutionEnvironment.createLocalEnvironment(1,
configuration);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
*//create FlinkKinesisConsumer*
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"1");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);*//deserialization works fine*
DataStream kinesisStream = env
.addSource(kinesisConsumer);
KeyedStream>
enrichedComponentInstanceStream1Key = kinesisStream
.keyBy(new KeySelector>() {
public Tuple3 getKey(Monitoring
mon) throws Exception {
return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation());
}
});

WindowedStream, TimeWindow>
enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key

.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win
//.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))
.process(new Window5SecProcessing());*//never gets in here*
//Gets into Window5SecProcessing.open() method during initialization but
never into the process method 
private static class Window5SecProcessing extends
ProcessWindowFunction, TimeWindow> {

private transient String interval;
private transient String gameId;
private transient String keyType;
private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;

private transient ValueState total5SecCountState;
private transient ValueStateDescriptor
total5SecCountValueStateDescriptor;
public Window5SecProcessing() {

}

public Window5SecProcessing(String gameId, String interval, String
keyType) {
this.gameId = gameId;
this.interval = interval;
this.keyType = keyType;
}

@Override
public void clear(Context context) throws Exception {
super.clear(context);
KeyedStateStore keyedStateStore = context.windowState();

keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.debug("Gets in here fine -Window5SecProcessing -Entered
open - parameters:{}", parameters);
com.codahale.metrics.Histogram fiveSecHist =
new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
total5SecCountValueStateDescriptor =
new ValueStateDescriptor("total5SecCount",
Long.class, 0L);
total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
}


public void process(Tuple3 currentKey1,
Context ctx, Iterable input, Collector out)
throws Exception {
logger.debug("@@never gets here@@Window5SecProcessing - Entered
process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan 
wrote:

> Hi,
> Running in IntelliJ IDE on a Mac with 4 vProcessors.
> Code compiles fine. It never gets into the Window5SecProcessing's
> process().I am able to get data from the Kinesis Consumer and it is
> deserialized properly when I debug the code. It gets into the
> Window5SecProce

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi Gary,
Just posted the code.Pls let me know if that clarifies the problem. Have
been digging into how the FlinkKinesisConsumer deserialized output gets
passed into the process() or apply() method to no avail. The coding pattern
I used matches all the fink-examples I have seen for Flink 1.6.1
TIA,
Vijay

On Fri, Nov 9, 2018 at 9:53 AM Gary Yao  wrote:

> Hi,
>
> If the job is actually running and consuming from Kinesis, the log you
> posted
> is unrelated to your problem. To understand why the process function is not
> invoked, we would need to see more of your code, or you would need to
> provide
> an executable example. The log only shows that all offered slots are
> occupied
> by tasks of your job.
>
> Best,
> Gary
>
> On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Running in IntelliJ IDE on a Mac with 4 vProcessors.
>> Code compiles fine. It never gets into the Window5SecProcessing's
>> process().I am able to get data from the Kinesis Consumer and it is
>> deserialized properly when I debug the code. It gets into the
>> Window5SecProcessing.open() method for initialization.
>>
>> Not sure if I am failing with no slots available ???
>> In main():
>>  //trimmed a lot of code
>> *FlinkKinesisConsumer kinesisConsumer =
>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
>> ...);*
>>
>> *DataStream kinesisStream = env*
>> *.addSource(kinesisConsumer)*
>> *.uid(jobName + "KinesisSource");*
>> *KeyedStream>
>> enrichedComponentInstanceStream1Key = kinesisStream*
>> *.keyBy(new KeySelector> String, String>>() {*
>> *public Tuple3
>> getKey(Monitoring mon) throws Exception {*
>> *return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
>> *}});*
>>
>> *WindowedStream,
>> TimeWindow> enrichedComponentInstanceStream1Win =
>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>>
>> *DataStream enrichedComponentInstanceStream1
>> = enrichedComponentInstanceStream1Win*
>> *.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))*
>> *.uid("Component Instance Operation Key Monitoring " +
>> FIVE_SECONDS);*
>> *enrichedComponentInstanceStream1.addSink(new
>> SinkFunction() {*
>> *@Override*
>> *public void invoke(MonitoringGrouping mg, Context context)
>> throws Exception {*
>> *//TODO call ES*
>> *logger.debug("In enrichedComponentInstanceStream1 Sink
>> received mg:{}", mg);*
>> *}*
>> *});*
>> *Window processing class*:
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction> String, String>, TimeWindow> {
>> private transient Histogram fiveSecHist;
>> private transient Histogram fiveMinHist;
>> private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>> private transient org.apache.flink.metrics.Histogram
>> fiveMinHistogram;
>> private transient ValueState total5SecCountState;
>> private transient ValueStateDescriptor
>> total5SecCountValueStateDescriptor;
>>
>> public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>> ...
>> }
>>
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> logger.debug("Window5SecProcessing -Entered open -
>> parameters:{}", parameters);//gets here
>> com.codahale.metrics.Histogram fiveSecHist =
>> new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>> this.fiveSecHistogram = new
>> DropwizardHistogramWrapper(fiveSecHist);
>> total5SecCountValueStateDescriptor =
>> new ValueStateDescriptor("total5SecCount",
>> Long.class, 0L);
>> total5SecCountState =
>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>> }
>> ..
>>
>>* public void process(Tuple3 currentKey1,
>> Context ctx, Iterable input, Collector out)
>> throws Exception {*
>> *logger.debug("Window5SecProcessing 

Re: Using FlinkKinesisConsumer through a proxy

2018-11-06 Thread Vijay Balakrishnan
Hi Gordon,
This still didn't work :(

Tried a few combinations with:
kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyDomain", "...");

inesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyHost", "http://.com;);

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyPort", "911");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyUsername", "...");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyPassword", "..");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"nonProxyHosts", "


How does the FlinkKinesisProducer work so seamlessly through a proxy ?
TIA,
Vijay

On Thu, Oct 4, 2018 at 6:41 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Since Flink 1.5, you should be able to set all available configurations on
> the ClientConfiguration through the consumer Properties (see FLINK-9188
> [1]).
>
> The way to do that would be to prefix the configuration you want to set
> with "aws.clientconfig" and add that to the properties, as such:
>
> ```
> Properties kinesisConsumerProps = new Properties();
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
> ...
> ```
>
> Could you try that out and see if it works for you?
>
> I've also realized that this feature isn't documented very well, and have
> opened a ticket for that [2].
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-9188
> [2] https://issues.apache.org/jira/browse/FLINK-10492
>
> On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek  wrote:
>
>> Hi,
>>
>> I'm looping in Gordon and Thomas, they might have some idea about how to
>> resolve this.
>>
>> Best,
>> Aljoscha
>>
>> On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:
>>
>> I have been trying with all variations  to no avail of java
>> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://...
>> -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=..
>> -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=...
>> -Dhttp.proxyPassword=... -jar .. after looking at the code in
>> com.amazonaws.ClientConfiguration
>>
>> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
>> wrote:
>>
>>> HI,
>>> How do I use FlinkKinesisConsumer using the Properties through a proxy ?
>>> Getting a Connection issue through the proxy.
>>> Works outside the proxy.
>>>
>>> Properties kinesisConsumerConfig = new Properties();
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>>> region);
>>>
>>> if (local) {
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>>> accessKey);
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>>> secretKey);
>>> } else {
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>>> "AUTO");
>>> }
>>>
>>> //only for Consumer
>>>
>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>>> "1");
>>>
>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>>> "2000");
>>> FlinkKinesisConsumer>
>>> kinesisConsumer = new FlinkKinesisConsumer<>(
>>> "kinesisTopicRead", new Tuple2KinesisSchema(),
>>> kinesisConsumerConfig);
>>> TIA
>>>
>>
>>


Re: Parallelize an incoming stream into 5 streams with the same data

2018-11-06 Thread Vijay Balakrishnan
Cool, thanks! Hequn. I will try that approach.

Vijay

On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng  wrote:

> Hi Vijay,
>
> > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
> operation on the KeyedStream and then perform group operation on the
> resultant set to get total count etc.
>
> From your description, I think you can perform a TumblingEventTimeWindow
> first, something looks like:
>
>> // tumbling processing-time windows
>> input
>> .keyBy()
>> .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>> .();
>
> then, you can perform a windowAll after the TumblingEventTimeWindow to get
> the final total count.
>
> Best,
> Hequn
>
>
>
> On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan 
> wrote:
>
>> Thanks,Hequn.
>> If I have to do a TumblingWindow operation like:
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>>
>> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>>
>> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow 
>> operation on the KeyedStream
>>
>> and then perform group operation on the resultant set to get total count etc.
>>
>> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>>
>>
>> .keyBy(*d._1,d._2*)
>> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>>
>> OR
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost 
>> in the next step:
>>
>> .keyBy(*d._1,d._2*)
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng  wrote:
>>
>>> Hi Vijay,
>>>
>>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>>> `inputStream`.
>>> While option 2 replicate all data to each task and option 3 split data
>>> into smaller groups without duplication.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
>>> wrote:
>>>
>>>> Hi,
>>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>>> streams with the same data. Each stream is keyed by different keys to do
>>>> various grouping operations on the set.
>>>>
>>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>>> DataStream to perform windowing/grouping operations ?
>>>>
>>>> *DataStream inputStream= ...*
>>>> *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>>> *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>>
>>>> *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
>>>> windowing/grouping operations in this function*
>>>> *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
>>>> windowing/grouping operations in this function*
>>>>
>>>> out1Stream.print();
>>>> out2Stream.addSink(new Out2Sink());
>>>>
>>>> Will this work ?
>>>>
>>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>>
>>>> *BroadcastStream broadCastStream = inputStream.broadcast(..);*
>>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> Or do I need to use split:
>>>>
>>>> *SplitStream source = inputStream.split(new MyOutputSelector());*
>>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>>
>>>>
>>>> static final class MyOutputSelector implements OutputSelector {
>>>> List outputs = new ArrayList();
>>>> public Iterable select(Long value) {
>>>> outputs.add("");
>>>> return outputs;
>>>> }
>>>> }
>>>> TIA,
>>>>
>>>


Never gets into ProcessWindowFunction.process()

2018-11-05 Thread Vijay Balakrishnan
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's
process().I am able to get data from the Kinesis Consumer and it is
deserialized properly when I debug the code. It gets into the
Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
 //trimmed a lot of code
*FlinkKinesisConsumer kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
...);*

*DataStream kinesisStream = env*
*.addSource(kinesisConsumer)*
*.uid(jobName + "KinesisSource");*
*KeyedStream>
enrichedComponentInstanceStream1Key = kinesisStream*
*.keyBy(new KeySelector>() {*
*public Tuple3
getKey(Monitoring mon) throws Exception {*
*return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation());*
*}});*

*WindowedStream,
TimeWindow> enrichedComponentInstanceStream1Win =
enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*

*DataStream enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win*
*.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))*
*.uid("Component Instance Operation Key Monitoring " +
FIVE_SECONDS);*
*enrichedComponentInstanceStream1.addSink(new
SinkFunction() {*
*@Override*
*public void invoke(MonitoringGrouping mg, Context context)
throws Exception {*
*//TODO call ES*
*logger.debug("In enrichedComponentInstanceStream1 Sink
received mg:{}", mg);*
*}*
*});*
*Window processing class*:
private static class Window5SecProcessing extends
ProcessWindowFunction, TimeWindow> {
private transient Histogram fiveSecHist;
private transient Histogram fiveMinHist;
private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;
private transient org.apache.flink.metrics.Histogram
fiveMinHistogram;
private transient ValueState total5SecCountState;
private transient ValueStateDescriptor
total5SecCountValueStateDescriptor;

public Window5SecProcessing(String gameId, String interval, String
keyType) {
...
}

public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.debug("Window5SecProcessing -Entered open -
parameters:{}", parameters);//gets here
com.codahale.metrics.Histogram fiveSecHist =
new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
total5SecCountValueStateDescriptor =
new ValueStateDescriptor("total5SecCount",
Long.class, 0L);
total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
}
..

   * public void process(Tuple3 currentKey1,
Context ctx, Iterable input, Collector out)
throws Exception {*
*logger.debug("Window5SecProcessing - Entered process
");//never gets here*
*Tuple3 currentKey = (Tuple3) currentKey1;*
**
*}*

}
At 1 point in the logs, I seem to see that there are no slots available
? Is that the problem- how can I fix that if that is the case to test
locally on my Mac ??
*Log:*
flink-akka.actor.default-dispatcher-71 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Slot Pool
Status:
status: connected to
akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
*available slots: []*
allocated slots: [[AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
 5a0ae59368145d715b3cc0d39ba6c05a 
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
(dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
groupId=null, physicalSlot=AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
group=8587a27f4c92252839400ce17054b261},
SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),

Re: Parallelize an incoming stream into 5 streams with the same data

2018-11-01 Thread Vijay Balakrishnan
Thanks,Hequn.
If I have to do a TumblingWindow operation like:

.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))

I am not able to do that on the output of keyBy(..) which is a KeyedStream.

I was hoping to groupBy(key._1,key._2) etc and then do a
tumblingWindow operation on the KeyedStream

and then perform group operation on the resultant set to get total count etc.

I am only able to do only 1 of keyBy or timeWindowAll as follows:


.keyBy(*d._1,d._2*)
.process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))

OR

.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))


Doing this doesn't seem to be too helpful as the keyBy KeyedStream is
lost in the next step:

.keyBy(*d._1,d._2*)
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))


TIA,

Vijay



On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng  wrote:

> Hi Vijay,
>
> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
> `inputStream`.
> While option 2 replicate all data to each task and option 3 split data
> into smaller groups without duplication.
>
> Best, Hequn
>
> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>> streams with the same data. Each stream is keyed by different keys to do
>> various grouping operations on the set.
>>
>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>> DataStream to perform windowing/grouping operations ?
>>
>> *DataStream inputStream= ...*
>> *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
>> *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>
>> *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
>> windowing/grouping operations in this function*
>> *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
>> windowing/grouping operations in this function*
>>
>> out1Stream.print();
>> out2Stream.addSink(new Out2Sink());
>>
>> Will this work ?
>>
>> Or do I use the keyBy Stream with a broadcast function like this:
>>
>> *BroadcastStream broadCastStream = inputStream.broadcast(..);*
>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>> * .process(new KeyedBroadcastProcessFunction...)*
>>
>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>> * .process(new KeyedBroadcastProcessFunction...)*
>>
>> Or do I need to use split:
>>
>> *SplitStream source = inputStream.split(new MyOutputSelector());*
>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>
>>
>> static final class MyOutputSelector implements OutputSelector {
>> List outputs = new ArrayList();
>> public Iterable select(Long value) {
>> outputs.add("");
>> return outputs;
>> }
>> }
>> TIA,
>>
>


Parallelize an incoming stream into 5 streams with the same data

2018-10-25 Thread Vijay Balakrishnan
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5
streams with the same data. Each stream is keyed by different keys to do
various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the
DataStream to perform windowing/grouping operations ?

*DataStream inputStream= ...*
*DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
*DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*

*DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
windowing/grouping operations in this function*
*DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
windowing/grouping operations in this function*

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

*BroadcastStream broadCastStream = inputStream.broadcast(..);*
*DataSTream out1Stream = keyBy1.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

*DataSTream out2Stream = keyBy2.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

Or do I need to use split:

*SplitStream source = inputStream.split(new MyOutputSelector());*
*source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector {
List outputs = new ArrayList();
public Iterable select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,


Guava conflict when using flink kinesis consumer with grpc protobuf

2018-10-24 Thread Vijay Balakrishnan
Hi,
I have a dependency on guava in grpc protobuf as follows:


com.google.guava
guava
26.0-jre

I also use Flink Kinesis Connector in the same project:

org.apache.flink
flink-connector-kinesis_${scala.binary.version}
${flink.version}

This Flink Kinesis connector has a dependency on an older version of guava
and this is causing issues with 2 versions of guava being loaded by the
classloaders. How do i avoid this issue ?

Details:
While building the Flink Kinesis Connector, I changed the pom.xml to try to
shade the guava library but this didn't help.It clearly says in the Flink
Kinesis Connector pom.xml to not shade guava.



com.google.protobuf
org.apache.flink.kinesis.shaded.com.google.protobuf
 .
Attempted following in my project's pom.xml but didn't work:




com.google.guava
org.apache.flink.kinesis.shaded.com.google.guava
 ..

Using JDK 8
TIA,


Re: Using FlinkKinesisConsumer through a proxy

2018-10-03 Thread Vijay Balakrishnan
I have been trying with all variations  to no avail of java
-Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://... -Dhttps.proxyPort=911
-Dhttps.proxyUser= -Dhttps.proxyPassword=.. -Dhttp.proxyHost=http://..
-Dhttp.proxyPort=911 -Dhttp.proxyUser=... -Dhttp.proxyPassword=... -jar ..
after looking at the code in com.amazonaws.ClientConfiguration

On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
wrote:

> HI,
> How do I use FlinkKinesisConsumer using the Properties through a proxy ?
> Getting a Connection issue through the proxy.
> Works outside the proxy.
>
> Properties kinesisConsumerConfig = new Properties();
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
> region);
>
> if (local) {
>
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> accessKey);
>
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> secretKey);
> } else {
>
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
> }
>
> //only for Consumer
>
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "1");
>
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");
> FlinkKinesisConsumer>
> kinesisConsumer = new FlinkKinesisConsumer<>(
> "kinesisTopicRead", new Tuple2KinesisSchema(),
> kinesisConsumerConfig);
> TIA
>


Using FlinkKinesisConsumer through a proxy

2018-10-02 Thread Vijay Balakrishnan
HI,
How do I use FlinkKinesisConsumer using the Properties through a proxy ?
Getting a Connection issue through the proxy.
Works outside the proxy.

Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
region);

if (local) {

kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
accessKey);

kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
secretKey);
} else {

kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
}

//only for Consumer

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"1");

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");
FlinkKinesisConsumer>
kinesisConsumer = new FlinkKinesisConsumer<>(
"kinesisTopicRead", new Tuple2KinesisSchema(),
kinesisConsumerConfig);
TIA


Re: AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

2018-07-27 Thread Vijay Balakrishnan
Hi,

Turns out the issue was with the RichParallelSourceFunction I was using was
resulting in the Sink not getting called after the connect
SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke
functions work fine now after I replaced the *ParallelCameraSource (*
RichParallelSourceFunction) with the old* CheckpointedCameraWithCubeSource.*





DataStream keyedByCamCameraStream = env
.addSource(*new *CheckpointedCameraWithCubeSource(maxSeqCnt,
servingSpeedMs, startTime, nbrCameras, outputFile), *"TileDB Camera"*)
.uid(*"TileDB-Camera"*)
.keyBy((cameraWithCube) -> cameraWithCube.*cameraKey *!= *null *?
cameraWithCube.*cameraKey*.getCam() : *new *Object())
.process(*new *ProcessFunction() {
@Override
*public void *processElement(CameraWithCube value, Context ctx,
Collector out) *throws *Exception {
out.collect(value);
}
})
.setParallelism(parallelCamTasks);











*/*DataStream keyedByCamCameraStream = env
.addSource(new ParallelCameraSource(maxSeqCnt, servingSpeedMs, startTime,
nbrCameras, outputFile), "TileDB Camera")
.uid("TileDB-Camera").setParallelism(parallelCamTasks)
.partitionCustom((Partitioner) (key, numPartitions) ->
{return key % numPartitions;}, new
KeySelector() {@Override
public Integer getKey(CameraWithCube cameraWithCube) throws Exception {
   ;}});*/*



Vijay

On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan 
wrote:

> Hi,
>
> I have 2 AsyncFunctions SampleCopyAsyncFunction and
> SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st
>  AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke
> gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never
> gets called(though open and close functions are called). Is there any way
> for me to have the 2nd asyncInvoke get called ? I have an 
> Executors.newFixedThreadPool(..)
> that I use within each AsyncFunction.
>
>
>
>
> TIA
>
>
>
>
>
> Here is the code:
>
>
>
> AsyncFunction cameraWithCubeAsyncFunction =
>
> new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
> options, nThreads);
>
> DataStream cameraWithCubeDataStreamAsync =
>
> AsyncDataStream.unorderedWait(keyedByCamCameraStream,
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
>
> .setParallelism(parallelCamTasks);//.startNewChain()
>
> DataStream cameraWithCubeDataStream =
> cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
> cameraWithCube.cameraKey != null ?
>
> cameraWithCube.cameraKey.getTs() : new Object());
>
> String uuid = UUID.randomUUID().toString();
>
> DataStream>
> enrichedCameraFeed = inputMetadataDataStream
>
> .connect(cameraWithCubeDataStream)
>
> .flatMap(new SyncLatchFunction(outputFile, outputPath,
> uuid))
>
> .uid("connect2Streams")
>
> .setParallelism(1);
>
> AsyncFunction,
> Tuple2> cubeSinkAsyncFunction =
>
> new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
> options, nThreads, uuid);
>
> DataStream>
> enrichedCameraFeedSinkAsync =
>
> AsyncDataStream.unorderedWait(enrichedCameraFeed,
> cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
> .setParallelism(parallelCubeTasks)
>
> .uid("Read-Image-Async");//ç== asyncInvoke never
> gets called for 2nd AsyncFunction
>
> DataStream>
> enrichedCameraFeedSinkAsyncDataStream =
> enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
> null ?
>
> tuple2.f0.inputMetadataKey.getTs() : new Object());
>
> //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t
> work
>
> enrichedCameraFeedSinkAsyncDataStream.addSink(new
> CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS
>
> .setParallelism(parallelCubeTasks)
>
> .uid("Cube-Sink");
>


AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

2018-07-26 Thread Vijay Balakrishnan
Hi,

I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction
called with AsyncDataStream.unorderedWait. The 1st
AsyncDataStream.unorderedWait’s
SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd
SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close
functions are called). Is there any way for me to have the 2nd asyncInvoke
get called ? I have an Executors.newFixedThreadPool(..) that I use within
each AsyncFunction.




TIA





Here is the code:



AsyncFunction cameraWithCubeAsyncFunction =

new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
options, nThreads);

DataStream cameraWithCubeDataStreamAsync =

AsyncDataStream.unorderedWait(keyedByCamCameraStream,
cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

.setParallelism(parallelCamTasks);//.startNewChain()

DataStream cameraWithCubeDataStream =
cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
cameraWithCube.cameraKey != null ?

cameraWithCube.cameraKey.getTs() : new Object());

String uuid = UUID.randomUUID().toString();

DataStream>
enrichedCameraFeed = inputMetadataDataStream

.connect(cameraWithCubeDataStream)

.flatMap(new SyncLatchFunction(outputFile, outputPath,
uuid))

.uid("connect2Streams")

.setParallelism(1);

AsyncFunction,
Tuple2> cubeSinkAsyncFunction =

new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
options, nThreads, uuid);

DataStream>
enrichedCameraFeedSinkAsync =

AsyncDataStream.unorderedWait(enrichedCameraFeed,
cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

.setParallelism(parallelCubeTasks)

.uid("Read-Image-Async");//ç== asyncInvoke never
gets called for 2nd AsyncFunction

DataStream>
enrichedCameraFeedSinkAsyncDataStream =
enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
null ?

tuple2.f0.inputMetadataKey.getTs() : new Object());

//enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work

enrichedCameraFeedSinkAsyncDataStream.addSink(new
CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS

.setParallelism(parallelCubeTasks)

.uid("Cube-Sink");


Re: How to partition within same physical node in Flink

2018-06-26 Thread Vijay Balakrishnan
Hi Fabian,
Thanks once again for your reply. I need to get the data from each
cam/camera into 1 partition/slot and not move the gigantic video data
around as much as I perform other operations on it. For eg, I can get seq#1
and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse,
stitch etc. operations on it in multiple threads within the same cam1
partition.

I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1
partition(eg: cam1). The idea is to then work within the cam1 partition
with various seq#'s 1,2 etc on various threads within the same
slot/partition of TaskManager.

The data is stored in EFS keyed based on seq#/cam# folder structure.

Our actual problem is managing network bandwidth as a resource in each
partition. We want to make sure that the processing of 1 camera(split into
multiple seq# tasks) is not running on the same node as the processing of
another camera as in that case, the required network bandwidth for storing
the output of the process running in the partition would exceed the network
bandwidth of the hardware. Camera processing is expected to run on the same
hardware as the video decode step which is an earlier sequential process in
the same Dataflow pipeline.

I guess I might have to use a ThreadPool within each Slot(cam partition) to
work on each seq# ??

TIA

On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:

> Hi,
>
> keyBy() does not work hierarchically. Each keyBy() overrides the previous
> partitioning.
> You can keyBy(cam, seq#) which guarantees that all records with the same
> (cam, seq#) are processed by the same parallel instance.
> However, Flink does not give any guarantees about how the (cam, seq#)
> partitions are distributed across slots (or even physical nodes).
>
> Btw. why is it important that all records of the same cam are processed by
> the same physical node?
>
> Fabian
>
> 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan :
>
>> I see a .slotSharingGroup for SingleOutputStreamOperator
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
>>  which
>> can put parallel instances of operations in same TM slot.
>> I also see a CoLocationGroup but do not see a .coLocationGroup
>> for SingleOutputStreamOperator to put a task on the same slot.Seems
>> CoLocationGroup
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
>> is defined at JobVertex level and has nothing to do with
>> for SingleOutputStreamOperator.
>> TaskManager has many slots. Slots have many threads within it.
>> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
>> use a keyBy(seq#) to run on many threads within that cam1 slot.
>>
>> Vijay
>>
>> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Thanks, Fabian.
>>> Been reading your excellent book on Flink Streaming.Can't wait for more
>>> chapters.
>>> Attached a pic.
>>>
>>> [image: partition-by-cam-ts.jpg]
>>>
>>> I have records with seq# 1 and cam1 and cam2. I also have records with
>>> varying seq#'s.
>>> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
>>> on the same task manager instance/slot/vCore(???)
>>> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
>>> slots/threads on the same Task Manager instance(aka cam1 partition) using
>>> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
>>> achieve this ?
>>>
>>> TIA
>>>
>>>
>>> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink distributes task instances to slots and does not expose physical
>>>> machines.
>>>> Records are partitioned to task instances by hash partitioning. It is
>>>> also not possible to guarantee that the records in two different operators
>>>> are send to the same slot.
>>>> Sharing information by side-passing it (e.g., via a file on a machine
>>>> or in a static object) is an anti-pattern and should be avoided.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :
>>>>
>>>>> Hi,
>>>>>
>>>>> Need to partition by cameraWithCube.getCam() 1st using
>>>>> parallelCamTasks(passed in as args).
>>>>>
>>>>> Then within each p

Re: How to partition within same physical node in Flink

2018-06-25 Thread Vijay Balakrishnan
I see a .slotSharingGroup for SingleOutputStreamOperator
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
which
can put parallel instances of operations in same TM slot.
I also see a CoLocationGroup but do not see a .coLocationGroup
for SingleOutputStreamOperator to put a task on the same slot.Seems
CoLocationGroup
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
is defined at JobVertex level and has nothing to do with
for SingleOutputStreamOperator.
TaskManager has many slots. Slots have many threads within it.
I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
use a keyBy(seq#) to run on many threads within that cam1 slot.

Vijay

On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan 
wrote:

> Thanks, Fabian.
> Been reading your excellent book on Flink Streaming.Can't wait for more
> chapters.
> Attached a pic.
>
> [image: partition-by-cam-ts.jpg]
>
> I have records with seq# 1 and cam1 and cam2. I also have records with
> varying seq#'s.
> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
> on the same task manager instance/slot/vCore(???)
> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
> slots/threads on the same Task Manager instance(aka cam1 partition) using
> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
> achieve this ?
>
> TIA
>
>
> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink distributes task instances to slots and does not expose physical
>> machines.
>> Records are partitioned to task instances by hash partitioning. It is
>> also not possible to guarantee that the records in two different operators
>> are send to the same slot.
>> Sharing information by side-passing it (e.g., via a file on a machine or
>> in a static object) is an anti-pattern and should be avoided.
>>
>> Best, Fabian
>>
>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :
>>
>>> Hi,
>>>
>>> Need to partition by cameraWithCube.getCam() 1st using
>>> parallelCamTasks(passed in as args).
>>>
>>> Then within each partition, need to partition again by
>>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
>>> getTS() runs on the same physical node ?
>>>
>>> How do I achieve that ?
>>>
>>> DataStream cameraWithCubeDataStream = env
>>> .addSource(new Source())
>>> .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>>> .process(new ProcessFunction() {
>>> public void processElement(CameraWithCube cameraWithCube, 
>>> Context context, Collector collector) throws Exception {
>>> //do nothing
>>> }
>>> })
>>> .slotSharingGroup("camSharingGroup")//TODO: how to add camera# 
>>> of the partition
>>> .setParallelism(parallelCamTasks)
>>> .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>>> .process(new ProcessFunction() {
>>> public void processElement(CameraWithCube cameraWithCube, 
>>> Context context, Collector collector) throws Exception {
>>> //TODO: process code
>>> }
>>> })
>>> .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
>>> parallel tasks within physical node
>>> .slotSharingGroup("??");//TODO: in same physical node
>>>
>>> TIA
>>>
>>
>>


Re: How to partition within same physical node in Flink

2018-06-25 Thread Vijay Balakrishnan
Thanks, Fabian.
Been reading your excellent book on Flink Streaming.Can't wait for more
chapters.
Attached a pic.

[image: partition-by-cam-ts.jpg]

I have records with seq# 1 and cam1 and cam2. I also have records with
varying seq#'s.
By partitioning on cam field first(keyBy(cam)), I can get cam1 partition on
the same task manager instance/slot/vCore(???)
Can I then have seq# 1 and seq# 2 for cam1 partition run in different
slots/threads on the same Task Manager instance(aka cam1 partition) using
keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to achieve
this ?

TIA


On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:

> Hi,
>
> Flink distributes task instances to slots and does not expose physical
> machines.
> Records are partitioned to task instances by hash partitioning. It is also
> not possible to guarantee that the records in two different operators are
> send to the same slot.
> Sharing information by side-passing it (e.g., via a file on a machine or
> in a static object) is an anti-pattern and should be avoided.
>
> Best, Fabian
>
> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :
>
>> Hi,
>>
>> Need to partition by cameraWithCube.getCam() 1st using
>> parallelCamTasks(passed in as args).
>>
>> Then within each partition, need to partition again by
>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
>> getTS() runs on the same physical node ?
>>
>> How do I achieve that ?
>>
>> DataStream cameraWithCubeDataStream = env
>> .addSource(new Source())
>> .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>> .process(new ProcessFunction() {
>> public void processElement(CameraWithCube cameraWithCube, 
>> Context context, Collector collector) throws Exception {
>> //do nothing
>> }
>> })
>> .slotSharingGroup("camSharingGroup")//TODO: how to add camera# 
>> of the partition
>> .setParallelism(parallelCamTasks)
>> .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>> .process(new ProcessFunction() {
>> public void processElement(CameraWithCube cameraWithCube, 
>> Context context, Collector collector) throws Exception {
>> //TODO: process code
>> }
>> })
>> .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
>> parallel tasks within physical node
>> .slotSharingGroup("??");//TODO: in same physical node
>>
>> TIA
>>
>
>


How to partition within same physical node in Flink

2018-06-24 Thread Vijay Balakrishnan
Hi,

Need to partition by cameraWithCube.getCam() 1st using
parallelCamTasks(passed in as args).

Then within each partition, need to partition again by
cameraWithCube.getTs() but need to make sure each of the 2nd partition by
getTS() runs on the same physical node ?

How do I achieve that ?

DataStream cameraWithCubeDataStream = env
.addSource(new Source())
.keyBy((cameraWithCube) -> cameraWithCube.getCam() )
.process(new ProcessFunction() {
public void processElement(CameraWithCube
cameraWithCube, Context context, Collector collector)
throws Exception {
//do nothing
}
})
.slotSharingGroup("camSharingGroup")//TODO: how to add
camera# of the partition
.setParallelism(parallelCamTasks)
.keyBy((cameraWithCube) -> cameraWithCube.getTs())
.process(new ProcessFunction() {
public void processElement(CameraWithCube
cameraWithCube, Context context, Collector collector)
throws Exception {
//TODO: process code
}
})
.setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how
many parallel tasks within physical node
.slotSharingGroup("??");//TODO: in same physical node

TIA


Docker Containers on YARN when using Flink on EMR

2018-06-17 Thread Vijay Balakrishnan
Hi,
Trying to use Docker Containers to be launched from YARN when using Flink
on EMR on Ubuntu. Can't seem to launch a Docker Container from YARN
Resource Manager while starting up the ./flink-yarn-session or Submitting a
Flink job ./bin/flink run ...
Following the docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#yarn
,

Tried various combinations of these variables:
-Dcontainerized.master.env.YARN_CONTAINER_RUNTIME_TYPE=docker,

-Dcontainerized.master.env.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=ashahab/hadoop-trunk

Nothing seems to be working.

TIA,
Vijay


Using Flink RocksDBStateBackend with EFS as an NFS mount for large image data

2018-06-02 Thread Vijay Balakrishnan
Hi,

We have big image data(about 20 MB each) coming in at high frequency/volume
from a video stream from many cameras.

The current
design thought is to store this data in the 1st step of the Flink Dataflow
in EFS(NAS) and access the EFS data from the 3rd step in the dataflow(may
be in a totally diffferent TaskManager node) without using
RocksDbStateBackend (aka slow Hadoop version1 pattern which Spark solved
with in-memory computation).
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#the-rocksdbstatebackend

1. Can we use RocksDbStateBackend configured with
file:///efsendpoint/checkpoints to store this image data in EFS and access
it from the 3rd step ?
2. Does the checkpointing interval need to be < than the time it takes to
get to Step 3 after storing data in EFS in step 1 ? Will this allow Step3
across a different TaskManager node to get to the data stored in EFS via
RockDBStateBackend assuming Local Task storage is set ?
3. Can I use the Metrics tab of the Flink dashboard to see how long each
step in the dataflow pipeline/graph takes ?

TIA,
Vijay


Re: Recovering from 1 of the nodes/slots of a Task Manager failing without resetting entire state during Recovery

2018-05-29 Thread Vijay Balakrishnan
HI,
I found the following documentation in the code:

flink-runtime:
org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy
Simple failover strategy that restarts each task individually.
 * This strategy is only applicable if the entire job consists unconnected
 * tasks, meaning each task is its own component.

 RestartPipelinedRegionStrategy:
 A failover strategy that restarts regions of the ExecutionGraph. A region
is defined
  * by this strategy as the weakly connected component of tasks that
communicate via pipelined
  * data exchange.


Can someone please explain to me the  IndividualStrategy (entire job
consists unconnected tasks) & PipelinedRegionStrategy( weakly connected
component of tasks that communicate via pipelined data exchange) with an
example ?

TIA,
Vijay

On Tue, May 15, 2018 at 3:16 PM Vijay Balakrishnan 
wrote:

> Hi,
> I have been going through the book "Real time streaming with Apache
> Flink".
> How do I recover state for just a single node/slot in a TaskManager
> without having the recovery reset the application state for all the Task
> Managers ?
>
> They mention the following:
> *Reset the state of the whole application to the latest checkpoint, i.e.,
> resetting the state of each task. The JobManager provides the location to
> the most recent checkpoints of task state. Note that, depending on the
> topology of the application, certain optimizations are possible and not all
> tasks need to be reset.*
>
> TIA,
> Vijay
> Flink newbie.
>


Re: How to sleep for 1 sec and then call keyBy for partitioning

2018-05-16 Thread Vijay Balakrishnan
Hi,
This worked out after looking at
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1

Why cannot I use setParallelism after keyBy-is it not an operator ?

DataStream cameraWithCubeDataStream = env
.addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getCam() : new Object())
//.setParallelism(parallelTasks) //??? Why cannot I use
setParallelism after keyBy-is it not an operator
//.setMaxParallelism(parallelTasks) //
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
.process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
@Override
public void processElement(CameraWithCube cameraWithCube,
Context context, Collector collector) throws Exception
{
logger.info("before thread sleep");
Thread.sleep(500);
logger.info("after thread sleep");
collector.collect(cameraWithCube);
}
})
.setParallelism(parallelTasks) //???do I need to set this or
will it take the parallelism from the earlier step ?
.setMaxParallelism(parallelTasks)
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getTs() : new Object());


TIA,

Vijay


On Wed, May 16, 2018 at 1:41 PM Jörn Franke <jornfra...@gmail.com> wrote:

> Just some advice - do not use sleep to simulate a heavy task. Use real
> data or generated data to simulate. This sleep is garbage from a software
> quality point of view. Furthermore, it is often forgotten etc.
>
> On 16. May 2018, at 22:32, Vijay Balakrishnan <bvija...@gmail.com> wrote:
>
> Hi,
> Newbie question - What I am trying to do is the following:
> CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
> 1. Need to partition data by cameraNbr.
> *2. Then sleep for 1 sec to simulate a heavy process in the task.*
> *3. Then need to partition data by TS and finally get the DataStream to
> connect with another DataStream.*
>
> DataStream cameraWithCubeDataStream = env
> .addSource(new CameraWithCubeSource(cameraFile, delay,
> servingSpeedFactor))
> .setParallelism(parallelTasks)
> .setMaxParallelism(parallelTasks)
> .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr
> cameraWithCube.cameraKey.getCam() : new Object());
> //sleep for 1 sec  how
> *((KeyedStream)
> cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
> *.apply(new WindowFunction<CameraWithCube,
> CameraWithCube, String, TimeWindow>() {*
> *@Override*
> *public void apply(String cameraKeyCam, TimeWindow
> timeWindow,*
> *  Iterable
> cameraWithCubesAssignedToWindow,*
> *  Collector
> collector) throws Exception {*
> *Thread.sleep(1000);*
> *
> cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
> collector.collect(cameraWithCube));*
>
> *}*
> *})//returning void here from apply ??*
> * //partition by TS and return DataStream*
> * .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr*
> * cameraWithCube.cameraKey.getTS() : new
> Object());*
> ;
> TIA,
> Vijay
>
>


How to sleep for 1 sec and then call keyBy for partitioning

2018-05-16 Thread Vijay Balakrishnan
Hi,
Newbie question - What I am trying to do is the following:
CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
1. Need to partition data by cameraNbr.
*2. Then sleep for 1 sec to simulate a heavy process in the task.*
*3. Then need to partition data by TS and finally get the DataStream to
connect with another DataStream.*

DataStream cameraWithCubeDataStream = env
.addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
.setParallelism(parallelTasks)
.setMaxParallelism(parallelTasks)
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null
? //partition by cameraNbr
cameraWithCube.cameraKey.getCam() : new Object());
//sleep for 1 sec  how
*((KeyedStream)
cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
*.apply(new WindowFunction() {*
*@Override*
*public void apply(String cameraKeyCam, TimeWindow
timeWindow,*
*  Iterable
cameraWithCubesAssignedToWindow,*
*  Collector
collector) throws Exception {*
*Thread.sleep(1000);*
*
cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
collector.collect(cameraWithCube));*

*}*
*})//returning void here from apply ??*
* //partition by TS and return DataStream*
* .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
null ? //partition by cameraNbr*
* cameraWithCube.cameraKey.getTS() : new Object());*
;
TIA,
Vijay


Recovering from 1 of the nodes/slots of a Task Manager failing without resetting entire state during Recovery

2018-05-15 Thread Vijay Balakrishnan
Hi,
I have been going through the book "Real time streaming with Apache Flink".
How do I recover state for just a single node/slot in a TaskManager without
having the recovery reset the application state for all the Task Managers ?

They mention the following:
*Reset the state of the whole application to the latest checkpoint, i.e.,
resetting the state of each task. The JobManager provides the location to
the most recent checkpoints of task state. Note that, depending on the
topology of the application, certain optimizations are possible and not all
tasks need to be reset.*

TIA,
Vijay
Flink newbie.