Excessive stdout is causing java heap out of mem

2017-05-19 Thread Fritz Budiyanto
Hi,

I notice that when I enabled DataStreamSink’s print() for debugging, (kinda 
excessive printing), its causing java Heap out of memory.
Possibly the Task Manager is buffering all stdout for the WebInterface? I 
haven’t spent time debugging it, but I wonder if this is expected where massive 
print will exhaust java heap, and I’m using standalone mode.

Is there a way to disable this memory logging for web interface, and just 
redirect stdout to file instead with file rotation?
What is the suggested method of logging ?

—
Fritz

Re: Best practices to maintain reference data for Flink Jobs

2017-05-19 Thread Sand Stone
Also, took a quick read on side input. it's unclear to me how side
input could solve this issue better.

At a high level, this is what I have in mind:
flatmap(byte[] value, Collector<> output) {
   var iter = someStoreStateObject.seek(akeyprefix);
//or seek(akeyprefix, akeysuffix);
for(byte[] key : iter) {}
}

Thanks for your time!


On Fri, May 19, 2017 at 10:03 AM, Sand Stone  wrote:
> Thanks Gordon and Fabian.
>
> The enriching data is really reference data, e.g. the reverseIP
> database. It's hard to be keyed as the main data stream as the "ip
> address" in the event is not a primary key in the main data stream.
>
> QueryableState is close, but it does not support range scan as far as
> I could tell. The remote datastore has a clean semantics: a logical
> single copy plus supports range scan, but the RPC to another cluster
> is not optimal.
>
> I assume this is a quite common streaming processing pattern for Flink
> based services.
>
>
> On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske  wrote:
>> +1 to what Gordon said.
>>
>> Queryable state is rather meant as an external interface to streaming jobs
>> than for lookups within jobs.
>> Accessing co-located state should give you better performance and is
>> probably easier to implement and maintain.
>>
>> Cheers,
>> Fabian
>>
>> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai :
>>>
>>> Hi,
>>>
>>> Can the enriching data be keyed? Or is it something that has to be
>>> broadcasted to each operator?
>>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>>> best fit for this. You can take a look at
>>> https://issues.apache.org/jira/browse/FLINK-6131.
>>>
>>> Regarding the 3 options you listed:
>>>
>>> By using QueryableState in option B, what you mean is that you want to
>>> feed the enriching data stream to a separate job, let that job allow
>>> queryable state, and query that state from the actual application job
>>> operators, correct? If so, I think options A and B would mean the same
>>> thing; i.e., they require accessing data external to the job.
>>>
>>> If the enriching data can somehow be keyed with the stream that requires
>>> it, I would go for option C using connected streams, with the enriching data
>>> as one input and the actual data as the other. Instead of just “caching the
>>> enriching data in memory”, you should register it as a managed Link state
>>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>>> can just access that registered state locally.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>>>
>>> Hi. Say I have a few reference data sets need to be used for a
>>> streaming job. The sizes range between 10M-10GB. The data is not
>>> static, will be refreshed at minutes and/or day intervals.
>>>
>>> With the new advancements in Flink, it seems there are quite a few
>>> options.
>>> A. Store all the data in an external (kv) database cluster. And use
>>> async io calls
>>> * data refresh can be done in a few different ways
>>> B. Use the new Querytable State feature
>>> * it seems there is no "easy" API to discover the
>>> queryable state at the moment. Need to use the restful API to figure
>>> out the job id.
>>> C. Ingest the reference data into the job and cache them in memory
>>> Any other option?
>>>
>>> On paper, it seems option B with the Queryable State is the cleanest
>>> solution.
>>>
>>> Any comment/suggestion is greatly appreciated in particular in terms
>>> of robustness and consistent recovery.
>>>
>>> Thanks much!
>>
>>


Re: Best practices to maintain reference data for Flink Jobs

2017-05-19 Thread Sand Stone
Thanks Gordon and Fabian.

The enriching data is really reference data, e.g. the reverseIP
database. It's hard to be keyed as the main data stream as the "ip
address" in the event is not a primary key in the main data stream.

QueryableState is close, but it does not support range scan as far as
I could tell. The remote datastore has a clean semantics: a logical
single copy plus supports range scan, but the RPC to another cluster
is not optimal.

I assume this is a quite common streaming processing pattern for Flink
based services.


On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske  wrote:
> +1 to what Gordon said.
>
> Queryable state is rather meant as an external interface to streaming jobs
> than for lookups within jobs.
> Accessing co-located state should give you better performance and is
> probably easier to implement and maintain.
>
> Cheers,
> Fabian
>
> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai :
>>
>> Hi,
>>
>> Can the enriching data be keyed? Or is it something that has to be
>> broadcasted to each operator?
>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>> best fit for this. You can take a look at
>> https://issues.apache.org/jira/browse/FLINK-6131.
>>
>> Regarding the 3 options you listed:
>>
>> By using QueryableState in option B, what you mean is that you want to
>> feed the enriching data stream to a separate job, let that job allow
>> queryable state, and query that state from the actual application job
>> operators, correct? If so, I think options A and B would mean the same
>> thing; i.e., they require accessing data external to the job.
>>
>> If the enriching data can somehow be keyed with the stream that requires
>> it, I would go for option C using connected streams, with the enriching data
>> as one input and the actual data as the other. Instead of just “caching the
>> enriching data in memory”, you should register it as a managed Link state
>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>> can just access that registered state locally.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>>
>> Hi. Say I have a few reference data sets need to be used for a
>> streaming job. The sizes range between 10M-10GB. The data is not
>> static, will be refreshed at minutes and/or day intervals.
>>
>> With the new advancements in Flink, it seems there are quite a few
>> options.
>> A. Store all the data in an external (kv) database cluster. And use
>> async io calls
>> * data refresh can be done in a few different ways
>> B. Use the new Querytable State feature
>> * it seems there is no "easy" API to discover the
>> queryable state at the moment. Need to use the restful API to figure
>> out the job id.
>> C. Ingest the reference data into the job and cache them in memory
>> Any other option?
>>
>> On paper, it seems option B with the Queryable State is the cleanest
>> solution.
>>
>> Any comment/suggestion is greatly appreciated in particular in terms
>> of robustness and consistent recovery.
>>
>> Thanks much!
>
>


Re: ConnectedStream keyby issues

2017-05-19 Thread Renjie Liu
Even if you increase the operator parallelism, you can still use the state
operation.

On Fri, May 19, 2017 at 7:47 PM Tarek khal 
wrote:

> If I increase the parallelism operator, I risk losing shared state solution
> or it has nothing to do.
> And if it's going to be an advantage, is it limited to what?
>
> I am new with this framework I find difficulty in some notions.
>
> Best Regards,
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13228.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: ConnectedStream keyby issues

2017-05-19 Thread Tarek khal
If I increase the parallelism operator, I risk losing shared state solution
or it has nothing to do.
And if it's going to be an advantage, is it limited to what?

I am new with this framework I find difficulty in some notions.

Best Regards,



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13228.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: ConnectedStream keyby issues

2017-05-19 Thread Renjie Liu
Jason's solution is right, l'm just clarifying the mistake in the
explanation.

Tarek khal 于2017年5月19日周五 下午7:11写道:

> Hello Renjie,
>
> Yes, the parallelism  is 1. what should i do pls ?
>
> Regards,
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13226.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: ConnectedStream keyby issues

2017-05-19 Thread Tarek khal
Hello Renjie,

Yes, the parallelism  is 1. what should i do pls ?

Regards,



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13226.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: ConnectedStream keyby issues

2017-05-19 Thread Renjie Liu
@Jason I think there's a mistake in your explanation since each task in the
task manager has its own copy of an operator instance, so the tuple may not
be shared. State is a great solution but I think that's not the root cause.

@Tarek What's the parallelism of your data stream? I think the reason may
be the parallelism is 1.

On Thu, May 4, 2017 at 10:39 PM Tarek khal 
wrote:

> Hi Jason,
>
> Thank you very much for your help, it solves my problem.
>
> Best regards,
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13003.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler
2. isn't quite accurate actually; metrics on the TaskManager are not 
persisted across restarts.


On 19.05.2017 11:21, Chesnay Schepler wrote:
1. This shouldn't happen. Do you access the counter from different 
threads?


2. Metrics in general are not persisted across restarts, and there is 
no way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:
Background: We are using a job using ProcessFunction which reads data 
from
kafka fires ~5-10K timers per second and sends matched events to 
KafkaSink.
We are collecting metrics for collecting no of active timers, no of 
timers
scheduled etc. We use statsd reporter and monitor using Grafana 
dashboard &

RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, 
on one
fine moment, metric of a monotonically increasing counter (Counter 
where we

just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient 
connectivity

issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It 
happened

just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: 
Cancelled

job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was 
inspecting

the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't 
get why

that was so.

Can anyone please add more insights into why above mentioned 
behaviors would

have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list 
archive at Nabble.com.









Re: FlinkCEP latency/throughput

2017-05-19 Thread Dawid Wysakowicz
Hello Alfred,

Just some considerations  from my side as for the latency. I think the
first step should be defining what does "latency" for a CEP library really
means.
The first thing that comes to my mind is the time period between the
arrival of an event that should trigger a match (ending pattern) and actual
time when the match is emitted(for that case a select function is a good
place I think).

I think Kostas was also referring to similar kind of issue.

Hope it will be helpful.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder



2017-05-19 10:59 GMT+02:00 Sonex :

> Hello Kostas,
>
> thanks for your response. Regarding throughput, it makes sense.
>
> But there is still one question remaining. How can I measure the latency of
> my FlinkCEP application ???
>
> Maybe you answered it, but I didn`t quite get that. As far as your number 2
> question about measuring latency, the answer is yes, the first element in
> the matching pattern will wait inevitably longer than the last one
>
> Thank you for your time!!!
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-
> tp13170p13221.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no 
way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Best practices to maintain reference data for Flink Jobs

2017-05-19 Thread Fabian Hueske
+1 to what Gordon said.

Queryable state is rather meant as an external interface to streaming jobs
than for lookups within jobs.
Accessing co-located state should give you better performance and is
probably easier to implement and maintain.

Cheers,
Fabian

2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai :

> Hi,
>
> Can the enriching data be keyed? Or is it something that has to be
> broadcasted to each operator?
> Either way, I think Side Inputs (an upcoming feature in the future) is the
> best fit for this. You can take a look at https://issues.apache.org/
> jira/browse/FLINK-6131.
>
> Regarding the 3 options you listed:
>
> By using QueryableState in option B, what you mean is that you want to
> feed the enriching data stream to a separate job, let that job allow
> queryable state, and query that state from the actual application job
> operators, correct? If so, I think options A and B would mean the same
> thing; i.e., they require accessing data external to the job.
>
> If the enriching data can somehow be keyed with the stream that requires
> it, I would go for option C using connected streams, with the enriching
> data as one input and the actual data as the other. Instead of just
> “caching the enriching data in memory”, you should register it as a managed
> Link state for the CoMapFunction / CoFlatMapFunction. The actual input
> stream records can just access that registered state locally.
>
> Cheers,
> Gordon
>
>
> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>
> Hi. Say I have a few reference data sets need to be used for a
> streaming job. The sizes range between 10M-10GB. The data is not
> static, will be refreshed at minutes and/or day intervals.
>
> With the new advancements in Flink, it seems there are quite a few
> options.
> A. Store all the data in an external (kv) database cluster. And use
> async io calls
> * data refresh can be done in a few different ways
> B. Use the new Querytable State feature
> * it seems there is no "easy" API to discover the
> queryable state at the moment. Need to use the restful API to figure
> out the job id.
> C. Ingest the reference data into the job and cache them in memory
> Any other option?
>
> On paper, it seems option B with the Queryable State is the cleanest
> solution.
>
> Any comment/suggestion is greatly appreciated in particular in terms
> of robustness and consistent recovery.
>
> Thanks much!
>
>


Re: ConnectedStream keyby issues

2017-05-19 Thread gaurav
Hello 

I am little confused on when the state will be gc. For example,

Example 1:

Class abc extends RichProcessFunction,Tuple<>>
{
   public void processElement(..)
   {
   if(timer never set)
   {
ctx.timerService().registerEventTimeTimer(...);
   }
   }
   public void onTimer(.)
   {
   // do some work 
   ctx.timerService().registerEventTimeTimer(...);
   }
}

In example 1, will it ever be garbage collected? Also, in example1 in
processElement we are only once registering eventTimer. Will it be gc when
the second event arrives?

And  if we have:
Example 2
public void onTimer(.)
   {
   // do some work 
  // no timer registeration 
   }
Will it be gc after completion of onTimer ?  





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13219.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink metrics related problems/questions

2017-05-19 Thread jaxbihani
Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state. 

Observations/Problems: 
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job. 
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so. 

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that? 




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.