Re: Broadcast to all the other operators

2017-11-08 Thread Ladhari Sadok
Thank you for the answer, I know that solution, but I don't want to stream
the rules all time.
In my case I have the rules in Redis and at startup of flink they are
loaded.

I want to broadcast changes just when it occurs.

Thanks.

Le 9 nov. 2017 7:51 AM, "Tony Wei"  a écrit :

> Hi Sadok,
>
> Since you want to broadcast Rule Stream to all subtasks, it seems that it
> is not necessary to use KeyedStream.
> How about use broadcast partitioner, connect two streams to attach the
> rule on each record or imply rule on them directly, and do the key operator
> after that?
> If you need to do key operator and apply the rules, it should work by
> changing the order.
>
> The code might be something like this, and you can change the rules' state
> in the CoFlatMapFunction.
>
> DataStream rules = ...;
> DataStream records = ...;
> DataStream> recordWithRule =
> rules.broadcast().connect(records).flatMap(...);
> dataWithRule.keyBy(...).process(...);
>
> Hope this will make sense to you.
>
> Best Regards,
> Tony Wei
>
> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok :
>
>> Hello,
>>
>> I'm working on Rules Engine project with Flink 1.3, in this project I
>> want to update some keyed operator state when external event occurred.
>>
>> I have a Datastream of updates (from kafka) I want to broadcast the data
>> contained in this stream to all keyed operator so I can change the state in
>> all operators.
>>
>> It is like this use case :
>> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
>> ng-in-definitions.png
>> All article : https://data-artisans.com/blog/real-time-fraud-detection-
>> ing-bank-apache-flink
>>
>> I founded it in the DataSet API but not in the DataStream API !
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#broadcast-variables
>>
>> Can some one explain to me who to solve this problem ?
>>
>> Thanks a lot.
>>
>> Flinkly regards,
>> Sadok
>>
>
>


Re: Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-08 Thread m@xi
Hello!

I found out that the cause of the problem was the map that I have after the
parallel join with parallelism 1.
When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
increase the number of parallelism p the completion time decreases, which is
reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
had it this way in order to measure a valid average throughput.

So, my question is the following: 

How can I measure the average throughput of my parallel join operation
properly?

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Broadcast to all the other operators

2017-11-08 Thread Tony Wei
Hi Sadok,

Since you want to broadcast Rule Stream to all subtasks, it seems that it
is not necessary to use KeyedStream.
How about use broadcast partitioner, connect two streams to attach the rule
on each record or imply rule on them directly, and do the key operator
after that?
If you need to do key operator and apply the rules, it should work by
changing the order.

The code might be something like this, and you can change the rules' state
in the CoFlatMapFunction.

DataStream rules = ...;
DataStream records = ...;
DataStream> recordWithRule =
rules.broadcast().connect(records).flatMap(...);
dataWithRule.keyBy(...).process(...);

Hope this will make sense to you.

Best Regards,
Tony Wei

2017-11-09 6:25 GMT+08:00 Ladhari Sadok :

> Hello,
>
> I'm working on Rules Engine project with Flink 1.3, in this project I want
> to update some keyed operator state when external event occurred.
>
> I have a Datastream of updates (from kafka) I want to broadcast the data
> contained in this stream to all keyed operator so I can change the state in
> all operators.
>
> It is like this use case :
> Image : https://data-artisans.com/wp-content/uploads/2017/10/
> streaming-in-definitions.png
> All article : https://data-artisans.com/blog/real-time-fraud-
> detection-ing-bank-apache-flink
>
> I founded it in the DataSet API but not in the DataStream API !
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#broadcast-variables
>
> Can some one explain to me who to solve this problem ?
>
> Thanks a lot.
>
> Flinkly regards,
> Sadok
>


Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Ashish Pokharel
Hi Grodon,

Thanks for your responses. It definitely makes sense. 

I could pull this stack from the logs, entire log itself is pretty big - let me 
know if some samples before/after this may help.
 
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
 Could not forward element to next operator}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
... 7 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 18 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 
record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 
record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append

Also for reference here is my ProducerConfig from logs:

INFO  

When using Flink for CEP, can the data in Cassandra database be used for state

2017-11-08 Thread shyla deshpande
Hello all,

I am new to Flink.

We have our data in Cassandra database. We have a use case for CEP.
I am checking out if Flink fits well for us.

When processing the event data, I may want to pull data for the cassandra
database like the user profile and join with the event data.

Is there a way to do that?  I appreciate your help.

Thanks


Broadcast to all the other operators

2017-11-08 Thread Ladhari Sadok
Hello,

I'm working on Rules Engine project with Flink 1.3, in this project I want
to update some keyed operator state when external event occurred.

I have a Datastream of updates (from kafka) I want to broadcast the data
contained in this stream to all keyed operator so I can change the state in
all operators.

It is like this use case :
Image :
https://data-artisans.com/wp-content/uploads/2017/10/streaming-in-definitions.png
All article :
https://data-artisans.com/blog/real-time-fraud-detection-ing-bank-apache-flink

I founded it in the DataSet API but not in the DataStream API !

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#broadcast-variables

Can some one explain to me who to solve this problem ?

Thanks a lot.

Flinkly regards,
Sadok


How to best create a bounded session window ?

2017-11-08 Thread Vishal Santoshi
I am implementing a bounded session window but I require to short circuit
the session if the session length ( in count of events or time ) go beyond
a configured limit , a very reasonable scenario ( bot etc ) . I am using
the approach as listed. I am not sure though if the Window itself is being
terminated and if that is even feasible. Any other approach or advise ?

public class BoundedEventTimeTrigger extends Trigger {
private static final long serialVersionUID = 1L;
long maxSessionTime;

ValueState doneState;
private final ValueStateDescriptor cleanupStateDescriptor =
new ValueStateDescriptor<>("done", Boolean.class );

private BoundedEventTimeTrigger(long maxSessionTime) {
this.maxSessionTime = maxSessionTime;
}

/**
 * Creates an event-time trigger that fires once the watermark
passes the end of the window.
 * 
 * Once the trigger fires all elements are discarded. Elements
that arrive late immediately
 * trigger window evaluation with just this one element.
 */
public static BoundedEventTimeTrigger create(long maxSessionLengh) {
return new BoundedEventTimeTrigger(maxSessionLengh);
}

@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
if(cleanupState!=null && cleanupState.value()!=null &&
cleanupState.value()) {
return TriggerResult.CONTINUE;
}
if(timestamp - window.getStart() > maxSessionTime){
System.out.println(new Date(timestamp) + "\t" + new
Date(window.getStart()));
try {
doneState = ctx.getPartitionedState(cleanupStateDescriptor);
doneState.update(true);
return TriggerResult.FIRE_AND_PURGE;
} catch (IOException e) {
throw new RuntimeException("Failed to update state", e);
}
}

if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerEventTimeTimer(window.maxTimestamp());
}

@Override
public String toString() {
return "EventTimeTrigger()";
}

}


RE: Job Manager Configuration

2017-11-08 Thread Chan, Regina
Thanks for the responses!

I’m currently using 1.2.0 – going to bump it up once I have things stabilized. 
I haven’t defined any slot sharing groups but I do think that I’ve probably got 
my job configured sub optimally. I’ve refactored my code so that I can submit 
subsets of the flow at a time and it seems to work. The break between the 
JobManager able to acknowledge job and not seems to hover somewhere between 
10-20 flows.

I guess what doesn’t make too much sense to me is if the user code is uploaded 
once to the JobManager and downloaded from each TaskManager, what exactly is 
the JobManager doing that’s keeping it busy? It’s the same code across the 
TaskManagers.

I’ll get you the logs shortly.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Wednesday, November 08, 2017 10:17 AM
To: Chan, Regina [Tech]
Cc: Chesnay Schepler; user@flink.apache.org
Subject: Re: Job Manager Configuration

Quick question Regina: Which version of Flink are you running?

Cheers,
Till

On Tue, Nov 7, 2017 at 4:38 PM, Till Rohrmann 
> wrote:
Hi Regina,

the user code is uploaded once to the `JobManager` and then downloaded from 
each `TaskManager` once when it first receives the command to execute the first 
task of your job.

As Chesnay said there is no fundamental limitation to the size of the Flink 
job. However, it might be the case that you have configured your job 
sub-optimally. You said that you have 300 parallel flows. Depending on whether 
you've defined separate slot sharing groups for them or not, it might be the 
case that parallel subtasks of all 300 parallel jobs share the same slot (if 
you haven't changed the slot sharing group). Depending on what you calculate, 
this can be inefficient because the individual tasks don't get much computation 
time. Moreover, all tasks will allocate some objects on the heap which can lead 
to more GC. Therefore, it might make sense to group some of the jobs together 
and run these jobs in batches after the previous batch completed. But this is 
hard to say without knowing the details of your job and getting a glimpse at 
the JobManager logs.

Concerning the exception you're seeing, it would also be helpful to see the 
logs of the client and the JobManager. Actually, the scheduling of the job is 
independent of the response. Only the creation of the ExecutionGraph and making 
the JobGraph highly available in case of an HA setup are executed before the 
JobManager acknowledges the job submission. Only if this acknowledge message is 
not received in time on the client side, then the SubmissionTimeoutException is 
thrown. Therefore, I assume that somehow the JobManager is too busy or kept 
from sending the acknowledge message.

Cheers,
Till



On Thu, Nov 2, 2017 at 7:18 PM, Chan, Regina 
> wrote:
Does it copy per TaskManager or per operator? I only gave it 10 TaskManagers 
with 2 slots. I’m perfectly fine with it queuing up and running when it has the 
resources to.



From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Wednesday, November 01, 2017 7:09 AM
To: user@flink.apache.org
Subject: Re: Job Manager Configuration

AFAIK there is no theoretical limit on the size of the plan, it just depends on 
the available resources.


The job submissions times out since it takes too long to deploy all the 
operators that the job defines. With 300 flows, each with 6 operators you're 
looking at potentially (1800 * parallelism) tasks that have to be deployed. For 
each task Flink copies the user-code of all flows to the executing TaskManager, 
which the network may just not be handle in time.

I suggest to split your job into smaller batches or even run each of them 
independently.

On 31.10.2017 16:25, Chan, Regina wrote:
Asking an additional question, what is the largest plan that the JobManager can 
handle? Is there a limit? My flows don’t need to run in parallel and can run 
independently. I wanted them to run in one single job because it’s part of one 
logical commit on my side.

Thanks,
Regina

From: Chan, Regina [Tech]
Sent: Monday, October 30, 2017 3:22 PM
To: 'user@flink.apache.org'
Subject: Job Manager Configuration

Flink Users,

I have about 300 parallel flows in one job each with 2 inputs, 3 operators, and 
1 sink which makes for a large job. I keep getting the below timeout exception 
but I’ve already set it to a 30 minute time out with a 6GB heap on the 
JobManager? Is there a heuristic to better configure the job manager?

Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.

Regina Chan
Goldman Sachs – Enterprise Platforms, Data Architecture
30 Hudson Street, 37th floor 

Do timestamps and watermarks exist after window evaluation?

2017-11-08 Thread Derek VerLee
When composing ("chaining") multiple windowing operations on the same
stream are watermarks transmitted down stream after window evaluation,
and are the records emitted from WindowFunctions given timestamps? Do I
need to or should I always assignTimestampsAndWatermarks to the
outputsof window evaluations if I want to window again? If automatically
assigned, how should I think about them in an event time context? Would
the event time of a record resulting from a WindowFunction be the
window's end time in the case of a TimeWindow?



Re: Generate watermarks per key in a KeyedStream

2017-11-08 Thread Xingcan Cui
Hi Shailesh,

actually, the watermarks are generated per partition, but all of them will
be forcibly aligned to the minimum one during processing. That is decided
by the semantics of watermark and KeyedStream, i.e., the watermarks belong
to a whole stream and a stream is made up of different partitions (one per
key).

If the physical devices work in different time systems due to delay, the
event streams from them should be treated separately.

Hope that helps.

Best,
Xingcan

On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain 
wrote:

> Hi,
>
> I'm working on implementing a use case wherein different physical devices
> are sending events, and due to network/power issues, there can be a delay
> in receiving events at Flink source. One of the operators within the flink
> job is the Pattern operator, and there are certain patterns which are time
> sensitive, so I'm using Event time characteristic. But the problem comes
> when there are unpredictable delays in events from a particular device(s),
> which causes those events to be dropped (as I cannot really define a static
> bound to allow for lateness).
>
> Since I'm using a KeyedStream, keyed on the source device ID, is there a
> way to allow each CEP operator instance (one per key) to progress its time
> based on the event time in the corresponding stream partition. Or in other
> words, is there a way to generate watermarks per partition in a KeyedStream?
>
> Thanks,
> Shailesh
>


Re: ResultPartitionMetrics

2017-11-08 Thread Nico Kruber
Hi Aitozi,
the difference is the scope: the normal metrics (without 
taskmanager.net.detailed-metrics) reflect _all_ buffers of a task while the 
detailed statistics are more fine-grained and give you statistics per input (or 
output) gate - the "total" there reflects the fact that each gate has multiple 
channels and the metrics offered are the sum among all of them.


Nico

On Monday, 23 October 2017 09:31:04 CET Timo Walther wrote:
> Hi Aitozi,
> 
> I will loop in people that are more familar with the network stack and
> metrics. Maybe this is a bug?
> 
> Regards,
> Timo
> 
> Am 10/22/17 um 4:36 PM schrieb aitozi:
> > Hi,
> > 
> > i see in version 1.3, it add the ResultPartitionMetrics with
> > issue:https://issues.apache.org/jira/browse/FLINK-5090
> > 
> > but i am doubt what is the difference between totalQueueLen and
> > inputQueueLength in
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/met
> > rics.html#network
> > 
> > i read the code where register the both metric in Task.java:
> > 
> > this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
> > 
> > // register detailed network metrics, if configured
> > if
> > (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWOR
> > K_DETAILED_METRICS)) {
> > 
> > // similar to 
> > MetricUtils.instantiateNetworkMetrics() but inside 
this
> > 
> > IOMetricGroup
> > 
> > MetricGroup networkGroup =
> > 
> > this.metrics.getIOMetricGroup().addGroup("Network");
> > 
> > MetricGroup outputGroup = 
> > networkGroup.addGroup("Output");
> > MetricGroup inputGroup = 
> > networkGroup.addGroup("Input");
> > 
> > // output metrics
> > for (int i = 0; i < producedPartitions.length; 
> > i++) {
> > 
> > 
> > ResultPartitionMetrics.registerQueueLengthMetrics(
> > 
> > outputGroup.addGroup(i), 
> > producedPartitions[i]);
> > 
> > }
> > 
> > for (int i = 0; i < inputGates.length; i++) {
> > 
> > 
> > InputGateMetrics.registerQueueLengthMetrics(
> > 
> > inputGroup.addGroup(i), 
> > inputGates[i]);
> > 
> > }
> > 
> > }
> > 
> > i think the first :initializeBufferMetrics#InputBuffersGauge will get all
> > the buffers in AllInputGate of the Task and i think the method in
> > InputGateMetric
> > "group.gauge("totalQueueLen",metrics.getTotalQueueLenGauge());" does the
> > same thing , if i understand wrong , please tell me,.
> > 
> > thanks,
> > Aitozi
> > 
> > 
> > 
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Generate watermarks per key in a KeyedStream

2017-11-08 Thread Shailesh Jain
Hi,

I'm working on implementing a use case wherein different physical devices
are sending events, and due to network/power issues, there can be a delay
in receiving events at Flink source. One of the operators within the flink
job is the Pattern operator, and there are certain patterns which are time
sensitive, so I'm using Event time characteristic. But the problem comes
when there are unpredictable delays in events from a particular device(s),
which causes those events to be dropped (as I cannot really define a static
bound to allow for lateness).

Since I'm using a KeyedStream, keyed on the source device ID, is there a
way to allow each CEP operator instance (one per key) to progress its time
based on the event time in the corresponding stream partition. Or in other
words, is there a way to generate watermarks per partition in a KeyedStream?

Thanks,
Shailesh


Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Btw, Ebru:

I don’t agree that the main suspect is NetworkBufferPool. On your screenshots 
it’s memory consumption was reasonable and stable: 596MB -> 602MB -> 597MB. 

PoolThreadCache memory usage ~120MB is also reasonable.

Do you experience any problems, like Out Of Memory errors/crashes/long GC 
pauses? Or just JVM process is using more memory over time? You are aware that 
JVM doesn’t like to release memory back to OS once it was used? So increasing 
memory usage until hitting some limit (for example JVM max heap size) is 
expected behaviour.

Piotrek

> On 8 Nov 2017, at 15:48, Piotr Nowojski  wrote:
> 
> I don’t know if this is relevant to this issue, but I was constantly getting 
> failures trying to reproduce this leak using your Job, because you were using 
> non deterministic getKey function:
> @Override
> public Integer getKey(Integer event) {
>Random randomGen = new Random((new Date()).getTime());
>return randomGen.nextInt() % 8;
> }
> And quoting Java doc of KeySelector:
> 
> "If invoked multiple times on the same object, the returned key must be the 
> same.”
> 
> I’m trying to reproduce this issue with following job:
> 
> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 
> 
> 
> Where IntegerSource is just an infinite source, DisardingSink is well just 
> discarding incoming data. I’m cancelling the job every 5 seconds and so far 
> (after ~15 minutes) my memory consumption is stable, well below maximum java 
> heap size.
> 
> Piotrek
> 
>> On 8 Nov 2017, at 15:28, Javier Lopez > > wrote:
>> 
>> Yes, I tested with just printing the stream. But it could take a lot of time 
>> to fail. 
>> 
>> On Wednesday, 8 November 2017, Piotr Nowojski > > wrote:
>> > Thanks for quick answer. 
>> > So it will also fail after some time with `fromElements` source instead of 
>> > Kafka, right? 
>> > Did you try it also without a Kafka producer?
>> > Piotrek
>> >
>> > On 8 Nov 2017, at 14:57, Javier Lopez > > > wrote:
>> > Hi,
>> > You don't need data. With data it will die faster. I tested as well with a 
>> > small data set, using the fromElements source, but it will take some time 
>> > to die. It's better with some data.
>> > On 8 November 2017 at 14:54, Piotr Nowojski > > > wrote:
>> >>
>> >> Hi,
>> >> Thanks for sharing this job. 
>> >> Do I need to feed some data to the Kafka to reproduce this issue with 
>> >> your script?
>> >> Does this OOM issue also happen when you are not using the Kafka 
>> >> source/sink? 
>> >> Piotrek
>> >>
>> >> On 8 Nov 2017, at 14:08, Javier Lopez > >> > wrote:
>> >> Hi,
>> >> This is the test flink job we created to trigger this leak 
>> >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
>> >> 
>> >> And this is the python script we are using to execute the job thousands 
>> >> of times to get the OOM problem 
>> >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
>> >> 
>> >> The cluster we used for this has this configuration:
>> >>
>> >> Instance type: t2.large
>> >> Number of workers: 2
>> >> HeapMemory: 5500
>> >> Number of task slots per node: 4
>> >> TaskMangMemFraction: 0.5
>> >> NumberOfNetworkBuffers: 2000
>> >>
>> >> We have tried several things, increasing the heap, reducing the heap, 
>> >> more memory fraction, changes this value in the taskmanager.sh 
>> >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> >> Thanks for your help.
>> >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> >> > 
>> >> wrote:
>> >>>
>> >>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>> 
>>  Hi Ebru and Javier,
>> 
>>  Yes, if you could share this example job it would be helpful.
>> 
>>  Ebru: could you explain in a little more details how does your Job(s)
>>  look like? Could you post some code? If you are just using maps and
>>  filters there shouldn’t be any network transfers involved, aside
>>  from Source and Sink functions.
>> 
>>  Piotrek
>> 
>> > On 8 Nov 2017, at 12:54, ebru > > > wrote:
>> >
>> > Hi Javier,
>> >
>> > It would be helpful if you share your test job with us.
>> > Which configurations did you try?
>> >
>> > -Ebru
>> >
>> > On 8 Nov 2017, at 14:43, Javier Lopez > > >
>> 

Re: Job Manager Configuration

2017-11-08 Thread Till Rohrmann
Quick question Regina: Which version of Flink are you running?

Cheers,
Till

On Tue, Nov 7, 2017 at 4:38 PM, Till Rohrmann 
wrote:

> Hi Regina,
>
> the user code is uploaded once to the `JobManager` and then downloaded
> from each `TaskManager` once when it first receives the command to execute
> the first task of your job.
>
> As Chesnay said there is no fundamental limitation to the size of the
> Flink job. However, it might be the case that you have configured your job
> sub-optimally. You said that you have 300 parallel flows. Depending on
> whether you've defined separate slot sharing groups for them or not, it
> might be the case that parallel subtasks of all 300 parallel jobs share the
> same slot (if you haven't changed the slot sharing group). Depending on
> what you calculate, this can be inefficient because the individual tasks
> don't get much computation time. Moreover, all tasks will allocate some
> objects on the heap which can lead to more GC. Therefore, it might make
> sense to group some of the jobs together and run these jobs in batches
> after the previous batch completed. But this is hard to say without knowing
> the details of your job and getting a glimpse at the JobManager logs.
>
> Concerning the exception you're seeing, it would also be helpful to see
> the logs of the client and the JobManager. Actually, the scheduling of the
> job is independent of the response. Only the creation of the ExecutionGraph
> and making the JobGraph highly available in case of an HA setup are
> executed before the JobManager acknowledges the job submission. Only if
> this acknowledge message is not received in time on the client side, then
> the SubmissionTimeoutException is thrown. Therefore, I assume that somehow
> the JobManager is too busy or kept from sending the acknowledge message.
>
> Cheers,
> Till
>
>
>
> On Thu, Nov 2, 2017 at 7:18 PM, Chan, Regina  wrote:
>
>> Does it copy per TaskManager or per operator? I only gave it 10
>> TaskManagers with 2 slots. I’m perfectly fine with it queuing up and
>> running when it has the resources to.
>>
>>
>>
>>
>>
>>
>>
>> *From:* Chesnay Schepler [mailto:ches...@apache.org]
>> *Sent:* Wednesday, November 01, 2017 7:09 AM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Job Manager Configuration
>>
>>
>>
>> AFAIK there is no theoretical limit on the size of the plan, it just
>> depends on the available resources.
>>
>>
>> The job submissions times out since it takes too long to deploy all the
>> operators that the job defines. With 300 flows, each with 6 operators
>> you're looking at potentially (1800 * parallelism) tasks that have to be
>> deployed. For each task Flink copies the user-code of *all* flows to the
>> executing TaskManager, which the network may just not be handle in time.
>>
>> I suggest to split your job into smaller batches or even run each of them
>> independently.
>>
>> On 31.10.2017 16:25, Chan, Regina wrote:
>>
>> Asking an additional question, what is the largest plan that the
>> JobManager can handle? Is there a limit? My flows don’t need to run in
>> parallel and can run independently. I wanted them to run in one single job
>> because it’s part of one logical commit on my side.
>>
>>
>>
>> Thanks,
>>
>> Regina
>>
>>
>>
>> *From:* Chan, Regina [Tech]
>> *Sent:* Monday, October 30, 2017 3:22 PM
>> *To:* 'user@flink.apache.org'
>> *Subject:* Job Manager Configuration
>>
>>
>>
>> Flink Users,
>>
>>
>>
>> I have about 300 parallel flows in one job each with 2 inputs, 3
>> operators, and 1 sink which makes for a large job. I keep getting the below
>> timeout exception but I’ve already set it to a 30 minute time out with a
>> 6GB heap on the JobManager? Is there a heuristic to better configure the
>> job manager?
>>
>>
>>
>> Caused by: 
>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>>
>>
>>
>> *Regina Chan*
>>
>> *Goldman Sachs* *–* Enterprise Platforms, Data Architecture
>>
>> *30 Hudson Street, 37th floor | Jersey City, NY 07302
>> *
>> (  (212) 902-5697
>>
>>
>>
>>
>>
>
>


Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
I don’t know if this is relevant to this issue, but I was constantly getting 
failures trying to reproduce this leak using your Job, because you were using 
non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
   Random randomGen = new Random((new Date()).getTime());
   return randomGen.nextInt() % 8;
}
And quoting Java doc of KeySelector:

"If invoked multiple times on the same object, the returned key must be the 
same.”

I’m trying to reproduce this issue with following job:

https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 


Where IntegerSource is just an infinite source, DisardingSink is well just 
discarding incoming data. I’m cancelling the job every 5 seconds and so far 
(after ~15 minutes) my memory consumption is stable, well below maximum java 
heap size.

Piotrek

> On 8 Nov 2017, at 15:28, Javier Lopez  wrote:
> 
> Yes, I tested with just printing the stream. But it could take a lot of time 
> to fail. 
> 
> On Wednesday, 8 November 2017, Piotr Nowojski  > wrote:
> > Thanks for quick answer. 
> > So it will also fail after some time with `fromElements` source instead of 
> > Kafka, right? 
> > Did you try it also without a Kafka producer?
> > Piotrek
> >
> > On 8 Nov 2017, at 14:57, Javier Lopez  > > wrote:
> > Hi,
> > You don't need data. With data it will die faster. I tested as well with a 
> > small data set, using the fromElements source, but it will take some time 
> > to die. It's better with some data.
> > On 8 November 2017 at 14:54, Piotr Nowojski  > > wrote:
> >>
> >> Hi,
> >> Thanks for sharing this job. 
> >> Do I need to feed some data to the Kafka to reproduce this issue with your 
> >> script?
> >> Does this OOM issue also happen when you are not using the Kafka 
> >> source/sink? 
> >> Piotrek
> >>
> >> On 8 Nov 2017, at 14:08, Javier Lopez  >> > wrote:
> >> Hi,
> >> This is the test flink job we created to trigger this leak 
> >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> >> 
> >> And this is the python script we are using to execute the job thousands of 
> >> times to get the OOM problem 
> >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> >> 
> >> The cluster we used for this has this configuration:
> >>
> >> Instance type: t2.large
> >> Number of workers: 2
> >> HeapMemory: 5500
> >> Number of task slots per node: 4
> >> TaskMangMemFraction: 0.5
> >> NumberOfNetworkBuffers: 2000
> >>
> >> We have tried several things, increasing the heap, reducing the heap, more 
> >> memory fraction, changes this value in the taskmanager.sh 
> >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> >> Thanks for your help.
> >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> >> > 
> >> wrote:
> >>>
> >>> On 2017-11-08 15:20, Piotr Nowojski wrote:
> 
>  Hi Ebru and Javier,
> 
>  Yes, if you could share this example job it would be helpful.
> 
>  Ebru: could you explain in a little more details how does your Job(s)
>  look like? Could you post some code? If you are just using maps and
>  filters there shouldn’t be any network transfers involved, aside
>  from Source and Sink functions.
> 
>  Piotrek
> 
> > On 8 Nov 2017, at 12:54, ebru  > > wrote:
> >
> > Hi Javier,
> >
> > It would be helpful if you share your test job with us.
> > Which configurations did you try?
> >
> > -Ebru
> >
> > On 8 Nov 2017, at 14:43, Javier Lopez  > >
> > wrote:
> >
> > Hi,
> >
> > We have been facing a similar problem. We have tried some different
> > configurations, as proposed in other email thread by Flavio and
> > Kien, but it didn't work. We have a workaround similar to the one
> > that Flavio has, we restart the taskmanagers once they reach a
> > memory threshold. We created a small test to remove all of our
> > dependencies and leave only flink native libraries. This test reads
> > data from a Kafka topic and writes it back to another topic in
> > Kafka. We cancel the job and start another every 5 seconds. After
> > ~30 minutes of doing this process, the cluster reaches the OS memory
> > limit and dies.
> >
> > Currently, we have a test cluster with 8 workers and 8 task slots
> 

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Yes, I tested with just printing the stream. But it could take a lot of
time to fail.

On Wednesday, 8 November 2017, Piotr Nowojski 
wrote:
> Thanks for quick answer.
> So it will also fail after some time with `fromElements` source instead
of Kafka, right?
> Did you try it also without a Kafka producer?
> Piotrek
>
> On 8 Nov 2017, at 14:57, Javier Lopez  wrote:
> Hi,
> You don't need data. With data it will die faster. I tested as well with
a small data set, using the fromElements source, but it will take some time
to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski 
wrote:
>>
>> Hi,
>> Thanks for sharing this job.
>> Do I need to feed some data to the Kafka to reproduce this issue with
your script?
>> Does this OOM issue also happen when you are not using the Kafka
source/sink?
>> Piotrek
>>
>> On 8 Nov 2017, at 14:08, Javier Lopez  wrote:
>> Hi,
>> This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>> And this is the python script we are using to execute the job thousands
of times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>> The cluster we used for this has this configuration:
>>
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>>
>> We have tried several things, increasing the heap, reducing the heap,
more memory fraction, changes this value in the taskmanager.sh
"TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> Thanks for your help.
>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:
>>>
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:

 Hi Ebru and Javier,

 Yes, if you could share this example job it would be helpful.

 Ebru: could you explain in a little more details how does your Job(s)
 look like? Could you post some code? If you are just using maps and
 filters there shouldn’t be any network transfers involved, aside
 from Source and Sink functions.

 Piotrek

> On 8 Nov 2017, at 12:54, ebru  wrote:
>
> Hi Javier,
>
> It would be helpful if you share your test job with us.
> Which configurations did you try?
>
> -Ebru
>
> On 8 Nov 2017, at 14:43, Javier Lopez 
> wrote:
>
> Hi,
>
> We have been facing a similar problem. We have tried some different
> configurations, as proposed in other email thread by Flavio and
> Kien, but it didn't work. We have a workaround similar to the one
> that Flavio has, we restart the taskmanagers once they reach a
> memory threshold. We created a small test to remove all of our
> dependencies and leave only flink native libraries. This test reads
> data from a Kafka topic and writes it back to another topic in
> Kafka. We cancel the job and start another every 5 seconds. After
> ~30 minutes of doing this process, the cluster reaches the OS memory
> limit and dies.
>
> Currently, we have a test cluster with 8 workers and 8 task slots
> per node. We have one job that uses 56 slots, and we cannot execute
> that job 5 times in a row because the whole cluster dies. If you
> want, we can publish our test job.
>
> Regards,
>
> On 8 November 2017 at 11:20, Aljoscha Krettek 
> wrote:
>
> @Nico & @Piotr Could you please have a look at this? You both
> recently worked on the network stack and might be most familiar with
> this.
>
> On 8. Nov 2017, at 10:25, Flavio Pompermaier 
> wrote:
>
> We also have the same problem in production. At the moment the
> solution is to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
> know whether the error produced by the test and the leak are
> correlated..
>
> Best,
> Flavio
>
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>  wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code?
> If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
>
> On Tue, Nov 7, 2017 at 2:35 PM, ebru 
> wrote:
> Hi Ufuk,
>
> We don’t explicitly define any state descriptor. We only use map
> and filters
> operator. We thought that gc handle clearing the flink’s internal
> states.
> So how can we manage the memory 

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Thanks for quick answer. 

So it will also fail after some time with `fromElements` source instead of 
Kafka, right? 

Did you try it also without a Kafka producer?

Piotrek

> On 8 Nov 2017, at 14:57, Javier Lopez  wrote:
> 
> Hi,
> 
> You don't need data. With data it will die faster. I tested as well with a 
> small data set, using the fromElements source, but it will take some time to 
> die. It's better with some data.
> 
> On 8 November 2017 at 14:54, Piotr Nowojski  > wrote:
> Hi,
> 
> Thanks for sharing this job. 
> 
> Do I need to feed some data to the Kafka to reproduce this issue with your 
> script?
> 
> Does this OOM issue also happen when you are not using the Kafka source/sink? 
> 
> Piotrek
> 
>> On 8 Nov 2017, at 14:08, Javier Lopez > > wrote:
>> 
>> Hi,
>> 
>> This is the test flink job we created to trigger this leak 
>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
>> 
>> And this is the python script we are using to execute the job thousands of 
>> times to get the OOM problem 
>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
>> 
>> 
>> The cluster we used for this has this configuration:
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>> We have tried several things, increasing the heap, reducing the heap, more 
>> memory fraction, changes this value in the taskmanager.sh 
>> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> 
>> Thanks for your help.
>> 
>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> > wrote:
>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>> Hi Ebru and Javier,
>> 
>> Yes, if you could share this example job it would be helpful.
>> 
>> Ebru: could you explain in a little more details how does your Job(s)
>> look like? Could you post some code? If you are just using maps and
>> filters there shouldn’t be any network transfers involved, aside
>> from Source and Sink functions.
>> 
>> Piotrek
>> 
>> On 8 Nov 2017, at 12:54, ebru > > wrote:
>> 
>> Hi Javier,
>> 
>> It would be helpful if you share your test job with us.
>> Which configurations did you try?
>> 
>> -Ebru
>> 
>> On 8 Nov 2017, at 14:43, Javier Lopez > >
>> wrote:
>> 
>> Hi,
>> 
>> We have been facing a similar problem. We have tried some different
>> configurations, as proposed in other email thread by Flavio and
>> Kien, but it didn't work. We have a workaround similar to the one
>> that Flavio has, we restart the taskmanagers once they reach a
>> memory threshold. We created a small test to remove all of our
>> dependencies and leave only flink native libraries. This test reads
>> data from a Kafka topic and writes it back to another topic in
>> Kafka. We cancel the job and start another every 5 seconds. After
>> ~30 minutes of doing this process, the cluster reaches the OS memory
>> limit and dies.
>> 
>> Currently, we have a test cluster with 8 workers and 8 task slots
>> per node. We have one job that uses 56 slots, and we cannot execute
>> that job 5 times in a row because the whole cluster dies. If you
>> want, we can publish our test job.
>> 
>> Regards,
>> 
>> On 8 November 2017 at 11:20, Aljoscha Krettek > >
>> wrote:
>> 
>> @Nico & @Piotr Could you please have a look at this? You both
>> recently worked on the network stack and might be most familiar with
>> this.
>> 
>> On 8. Nov 2017, at 10:25, Flavio Pompermaier > >
>> wrote:
>> 
>> We also have the same problem in production. At the moment the
>> solution is to restart the entire Flink cluster after every job..
>> We've tried to reproduce this problem with a test (see
>> https://issues.apache.org/jira/browse/FLINK-7845 
>>  [1]) but we don't
>> 
>> know whether the error produced by the test and the leak are
>> correlated..
>> 
>> Best,
>> Flavio
>> 
>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> > wrote:
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>> Do you use any windowing? If yes, could you please share that code?
>> If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>> 
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru > >
>> 

Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-08 Thread m@xi
Hello everyone!

I have implemented a custom parallel hashjoin algorithm (without windows
feature) in order to calculate the join of two input streams on a common
attribute using the CoFlatMap function and the state. After the join
operator (which has parallelism p = #processors) operator I have a map
operation (with parallelism 1) where I am using the Meter component to
measure the average throughput of the join operation. Finally, I am using a
DiscardingSink() as I only care about the throughput and the final count of
the join's result. I maintain 2 values of the throughput, the MAX avg value
I ever seen and the AVG avg value I have seen.

I am running on a server with 48 processors and I expect throughput to get
higher when the parallelism p becomes > 1. The same input stream is used in
all cases. 

Although, as you can see in the excel file I attache not only the throughput
does not increase with the increase of p but also the time for the flink job
to execute increases as well.

I have also read this:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-td13170.html
where Kostas Kloudas implied that the Flink is not optimized for
multiprocessor execution. 

I am wondering if this issue has to do with 1) the way that I am measuring
throughput, 2) the Flink system's internals that are not optimized for
multiprocessor architecture.

Any ideas or comments are welcome.

Thanks in advance.

Best,
Max

experiments8_11_17.xlsx

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, that was really helpful

Cheers,
Federico

2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz :

> Unforunately there is mistake in the docs the return type should be
> DataStream rather than SingleOuputStream
>
> The correct version should be:
>
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
>
> val outputTag = OutputTag[String]("side-output")
>
> val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
> (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> } {
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
>
> This syntax is only available in 1.4 though, in previous versions
> timeouted events were not returned via sideOutput.
>
>
>
> > On 8 Nov 2017, at 12:18, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
> >
> > Thank you very much, Dawid, for your thorough explanation, really
> useful. I totally missed the distinction between timed-out events and
> complete matches.
> >
> > I'd like to ask you one more thing, about the flinkCEP scala api: in the
> documentation, there is the following code:
> >
> > val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
> >
> >
> >
> > val outputTag = OutputTag[String]("side-output")
> >
> >
> >
> > val result: SingleOutputStreamOperator[ComplexEvent] =
> patternStream.select(outputTag){
> >
> >
> > (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> > } {
> >
> >
> > pattern: Map[String, Iterable[Event]] => ComplexEvent()
> > }
> >
> > where result would then be used to get outputtag side output.
> > If I paste this code I get that the select function is missing its
> parameters ("Unspecified value parameters: patternSelectFunction:
> PatternSelectFunction[ComplexEvent, NotInferredR]""),
> > while, If I add the parameters explicitly such as
> >
> > patternStream.select[TimeoutEvent, ComplexEvent]
> >
> > I get "Too many arguments for select". Am I missing something?
> >
> > Thank you very much,
> > Federico
> >
> > 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz  >:
> > Hi Federico,
> >
> > For your given input and pattern there should (and there are) only two
> timeouted patterns:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> >
> > It is because in your patterns say the next event after events with
> value >=100 should not have value >= 100 . And within your timeout there is
> no sequence of events where (>=100)+ (<100).
> >
> > But I will try to explain how it works with the same input for Pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value <100).within(Time.minutes(30))
> >
> > Then we have matches:
> >
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02
> >
> > and timeouted partial matches:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> >
> > Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you
> will be able to specify AFTER_MATCH_SKIP strategy see:
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
> matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
> 2017-11-05T03:54:02.
> > Also right now the oneOrMore is not greedy (in 1.4 you will be able to
> alter it see: https://issues.apache.org/jira/browse/FLINK-7147),
> therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and
> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))
> rather than only one of those.
> >
> > The timeoute partial matches are returned because within the timeout
> there was no event with value <100 (in fact there was no event at all to be
> checked).
> >
> > Hope this "study" helps you understand the behaviour. If you feel I
> missed something, please provide some example I could reproduce.
> >
> > Regards,
> > Dawid
> >
> > 2017-11-07 11:29 GMT+01:00 Ufuk Celebi :
> > Hey Frederico,
> >
> > let me 

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi,

You don't need data. With data it will die faster. I tested as well with a
small data set, using the fromElements source, but it will take some time
to die. It's better with some data.

On 8 November 2017 at 14:54, Piotr Nowojski  wrote:

> Hi,
>
> Thanks for sharing this job.
>
> Do I need to feed some data to the Kafka to reproduce this issue with your
> script?
>
> Does this OOM issue also happen when you are not using the Kafka
> source/sink?
>
> Piotrek
>
> On 8 Nov 2017, at 14:08, Javier Lopez  wrote:
>
> Hi,
>
> This is the test flink job we created to trigger this leak
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
> And this is the python script we are using to execute the job thousands of
> times to get the OOM problem https://gist.github.com/javieredo/
> 4825324d5d5f504e27ca6c004396a107
>
> The cluster we used for this has this configuration:
>
>- Instance type: t2.large
>- Number of workers: 2
>- HeapMemory: 5500
>- Number of task slots per node: 4
>- TaskMangMemFraction: 0.5
>- NumberOfNetworkBuffers: 2000
>
> We have tried several things, increasing the heap, reducing the heap, more
> memory fraction, changes this value in the taskmanager.sh
> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>
> Thanks for your help.
>
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926...@cs.hacettepe.edu.tr> wrote:
>
>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>
>>> Hi Ebru and Javier,
>>>
>>> Yes, if you could share this example job it would be helpful.
>>>
>>> Ebru: could you explain in a little more details how does your Job(s)
>>> look like? Could you post some code? If you are just using maps and
>>> filters there shouldn’t be any network transfers involved, aside
>>> from Source and Sink functions.
>>>
>>> Piotrek
>>>
>>> On 8 Nov 2017, at 12:54, ebru  wrote:

 Hi Javier,

 It would be helpful if you share your test job with us.
 Which configurations did you try?

 -Ebru

 On 8 Nov 2017, at 14:43, Javier Lopez 
 wrote:

 Hi,

 We have been facing a similar problem. We have tried some different
 configurations, as proposed in other email thread by Flavio and
 Kien, but it didn't work. We have a workaround similar to the one
 that Flavio has, we restart the taskmanagers once they reach a
 memory threshold. We created a small test to remove all of our
 dependencies and leave only flink native libraries. This test reads
 data from a Kafka topic and writes it back to another topic in
 Kafka. We cancel the job and start another every 5 seconds. After
 ~30 minutes of doing this process, the cluster reaches the OS memory
 limit and dies.

 Currently, we have a test cluster with 8 workers and 8 task slots
 per node. We have one job that uses 56 slots, and we cannot execute
 that job 5 times in a row because the whole cluster dies. If you
 want, we can publish our test job.

 Regards,

 On 8 November 2017 at 11:20, Aljoscha Krettek 
 wrote:

 @Nico & @Piotr Could you please have a look at this? You both
 recently worked on the network stack and might be most familiar with
 this.

 On 8. Nov 2017, at 10:25, Flavio Pompermaier 
 wrote:

 We also have the same problem in production. At the moment the
 solution is to restart the entire Flink cluster after every job..
 We've tried to reproduce this problem with a test (see
 https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't

 know whether the error produced by the test and the leak are
 correlated..

 Best,
 Flavio

 On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
  wrote:
 On 2017-11-07 16:53, Ufuk Celebi wrote:
 Do you use any windowing? If yes, could you please share that code?
 If
 there is no stateful operation at all, it's strange where the list
 state instances are coming from.

 On Tue, Nov 7, 2017 at 2:35 PM, ebru 
 wrote:
 Hi Ufuk,

 We don’t explicitly define any state descriptor. We only use map
 and filters
 operator. We thought that gc handle clearing the flink’s internal
 states.
 So how can we manage the memory if it is always increasing?

 - Ebru

 On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:

 Hey Ebru, the memory usage might be increasing as long as a job is
 running.
 This is expected (also in the case of multiple running jobs). The
 screenshots are not helpful in that regard. :-(

 What kind of stateful operations are you using? Depending on your
 use case,
 you have to manually call 

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi,

Thanks for sharing this job. 

Do I need to feed some data to the Kafka to reproduce this issue with your 
script?

Does this OOM issue also happen when you are not using the Kafka source/sink? 

Piotrek

> On 8 Nov 2017, at 14:08, Javier Lopez  wrote:
> 
> Hi,
> 
> This is the test flink job we created to trigger this leak 
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> 
> And this is the python script we are using to execute the job thousands of 
> times to get the OOM problem 
> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> 
> 
> The cluster we used for this has this configuration:
> Instance type: t2.large
> Number of workers: 2
> HeapMemory: 5500
> Number of task slots per node: 4
> TaskMangMemFraction: 0.5
> NumberOfNetworkBuffers: 2000
> We have tried several things, increasing the heap, reducing the heap, more 
> memory fraction, changes this value in the taskmanager.sh 
> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> 
> Thanks for your help.
> 
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> > wrote:
> On 2017-11-08 15:20, Piotr Nowojski wrote:
> Hi Ebru and Javier,
> 
> Yes, if you could share this example job it would be helpful.
> 
> Ebru: could you explain in a little more details how does your Job(s)
> look like? Could you post some code? If you are just using maps and
> filters there shouldn’t be any network transfers involved, aside
> from Source and Sink functions.
> 
> Piotrek
> 
> On 8 Nov 2017, at 12:54, ebru  > wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us.
> Which configurations did you try?
> 
> -Ebru
> 
> On 8 Nov 2017, at 14:43, Javier Lopez  >
> wrote:
> 
> Hi,
> 
> We have been facing a similar problem. We have tried some different
> configurations, as proposed in other email thread by Flavio and
> Kien, but it didn't work. We have a workaround similar to the one
> that Flavio has, we restart the taskmanagers once they reach a
> memory threshold. We created a small test to remove all of our
> dependencies and leave only flink native libraries. This test reads
> data from a Kafka topic and writes it back to another topic in
> Kafka. We cancel the job and start another every 5 seconds. After
> ~30 minutes of doing this process, the cluster reaches the OS memory
> limit and dies.
> 
> Currently, we have a test cluster with 8 workers and 8 task slots
> per node. We have one job that uses 56 slots, and we cannot execute
> that job 5 times in a row because the whole cluster dies. If you
> want, we can publish our test job.
> 
> Regards,
> 
> On 8 November 2017 at 11:20, Aljoscha Krettek  >
> wrote:
> 
> @Nico & @Piotr Could you please have a look at this? You both
> recently worked on the network stack and might be most familiar with
> this.
> 
> On 8. Nov 2017, at 10:25, Flavio Pompermaier  >
> wrote:
> 
> We also have the same problem in production. At the moment the
> solution is to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 
>  [1]) but we don't
> 
> know whether the error produced by the test and the leak are
> correlated..
> 
> Best,
> Flavio
> 
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> > wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code?
> If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru  >
> wrote:
> Hi Ufuk,
> 
> We don’t explicitly define any state descriptor. We only use map
> and filters
> operator. We thought that gc handle clearing the flink’s internal
> states.
> So how can we manage the memory if it is always increasing?
> 
> - Ebru
> 
> On 7 Nov 2017, at 16:23, Ufuk Celebi  > wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is
> running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your
> use case,
> you have to manually call `clear()` on the state instance in order
> to
> release the managed state.
> 
> 

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi,

This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
And this is the python script we are using to execute the job thousands of
times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107

The cluster we used for this has this configuration:

   - Instance type: t2.large
   - Number of workers: 2
   - HeapMemory: 5500
   - Number of task slots per node: 4
   - TaskMangMemFraction: 0.5
   - NumberOfNetworkBuffers: 2000

We have tried several things, increasing the heap, reducing the heap, more
memory fraction, changes this value in the taskmanager.sh
"TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.

Thanks for your help.

On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:

> On 2017-11-08 15:20, Piotr Nowojski wrote:
>
>> Hi Ebru and Javier,
>>
>> Yes, if you could share this example job it would be helpful.
>>
>> Ebru: could you explain in a little more details how does your Job(s)
>> look like? Could you post some code? If you are just using maps and
>> filters there shouldn’t be any network transfers involved, aside
>> from Source and Sink functions.
>>
>> Piotrek
>>
>> On 8 Nov 2017, at 12:54, ebru  wrote:
>>>
>>> Hi Javier,
>>>
>>> It would be helpful if you share your test job with us.
>>> Which configurations did you try?
>>>
>>> -Ebru
>>>
>>> On 8 Nov 2017, at 14:43, Javier Lopez 
>>> wrote:
>>>
>>> Hi,
>>>
>>> We have been facing a similar problem. We have tried some different
>>> configurations, as proposed in other email thread by Flavio and
>>> Kien, but it didn't work. We have a workaround similar to the one
>>> that Flavio has, we restart the taskmanagers once they reach a
>>> memory threshold. We created a small test to remove all of our
>>> dependencies and leave only flink native libraries. This test reads
>>> data from a Kafka topic and writes it back to another topic in
>>> Kafka. We cancel the job and start another every 5 seconds. After
>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>>> limit and dies.
>>>
>>> Currently, we have a test cluster with 8 workers and 8 task slots
>>> per node. We have one job that uses 56 slots, and we cannot execute
>>> that job 5 times in a row because the whole cluster dies. If you
>>> want, we can publish our test job.
>>>
>>> Regards,
>>>
>>> On 8 November 2017 at 11:20, Aljoscha Krettek 
>>> wrote:
>>>
>>> @Nico & @Piotr Could you please have a look at this? You both
>>> recently worked on the network stack and might be most familiar with
>>> this.
>>>
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier 
>>> wrote:
>>>
>>> We also have the same problem in production. At the moment the
>>> solution is to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see
>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>
>>> know whether the error produced by the test and the leak are
>>> correlated..
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>  wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code?
>>> If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru 
>>> wrote:
>>> Hi Ufuk,
>>>
>>> We don’t explicitly define any state descriptor. We only use map
>>> and filters
>>> operator. We thought that gc handle clearing the flink’s internal
>>> states.
>>> So how can we manage the memory if it is always increasing?
>>>
>>> - Ebru
>>>
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:
>>>
>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>> running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>>
>>> What kind of stateful operations are you using? Depending on your
>>> use case,
>>> you have to manually call `clear()` on the state instance in order
>>> to
>>> release the managed state.
>>>
>>> Best,
>>>
>>> Ufuk
>>>
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>  wrote:
>>>
>>> Begin forwarded message:
>>>
>>> From: ebru 
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi 
>>>
>>> Hi Ufuk,
>>>
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>> memory
>>> usage is always increasing over time.
>>>
>>> <1.png><2.png><3.png>
>>>
>>> On 7 Nov 2017, at 

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Dawid Wysakowicz
Unforunately there is mistake in the docs the return type should be DataStream 
rather than SingleOuputStream

The correct version should be:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long) => 
TimeoutEvent()
} {
pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

This syntax is only available in 1.4 though, in previous versions timeouted 
events were not returned via sideOutput.



> On 8 Nov 2017, at 12:18, Federico D'Ambrosio  
> wrote:
> 
> Thank you very much, Dawid, for your thorough explanation, really useful. I 
> totally missed the distinction between timed-out events and complete matches.
> 
> I'd like to ask you one more thing, about the flinkCEP scala api: in the 
> documentation, there is the following code:
> 
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
> 
> 
> 
> val outputTag = OutputTag[String]("side-output")
> 
> 
> 
> val result: SingleOutputStreamOperator[ComplexEvent] = 
> patternStream.select(outputTag){
> 
> 
> (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
> } {
> 
> 
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
> 
> where result would then be used to get outputtag side output.
> If I paste this code I get that the select function is missing its parameters 
> ("Unspecified value parameters: patternSelectFunction: 
> PatternSelectFunction[ComplexEvent, NotInferredR]""),
> while, If I add the parameters explicitly such as
> 
> patternStream.select[TimeoutEvent, ComplexEvent]
> 
> I get "Too many arguments for select". Am I missing something?
> 
> Thank you very much,
> Federico
> 
> 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz :
> Hi Federico,
> 
> For your given input and pattern there should (and there are) only two 
> timeouted patterns:
> 
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> 
> It is because in your patterns say the next event after events with value 
> >=100 should not have value >= 100 . And within your timeout there is no 
> sequence of events where (>=100)+ (<100).
> 
> But I will try to explain how it works with the same input for Pattern:
> 
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
> 
> Then we have matches:
> 
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), 
> Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), 
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), 
> Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02
> 
> and timeouted partial matches:
> 
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), 
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), 
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), 
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), 
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> 
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will 
> be able to specify AFTER_MATCH_SKIP strategy see: 
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches 
> starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter 
> it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see 
> matches like: List(Event(100,2017-11-05T03:50:02)) and 
> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather 
> than only one of those.
> 
> The timeoute partial matches are returned because within the timeout there 
> was no event with value <100 (in fact there was no event at all to be 
> checked).
> 
> Hope this "study" helps you understand the behaviour. If you feel I missed 
> something, please provide some example I could reproduce.
> 
> Regards,
> Dawid
> 
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi :
> Hey Frederico,
> 
> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> the expected behaviour here.
> 
> Best,
> 
> Ufuk
> 
> 
> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
>  wrote:
> > Hi everyone,
> >
> > I wanted to ask if FlinkCEP in the 

Re: Flink memory leak

2017-11-08 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU

On 2017-11-08 15:20, Piotr Nowojski wrote:

Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s)
look like? Could you post some code? If you are just using maps and
filters there shouldn’t be any network transfers involved, aside
from Source and Sink functions.

Piotrek


On 8 Nov 2017, at 12:54, ebru  wrote:

Hi Javier,

It would be helpful if you share your test job with us.
Which configurations did you try?

-Ebru

On 8 Nov 2017, at 14:43, Javier Lopez 
wrote:

Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and
Kien, but it didn't work. We have a workaround similar to the one
that Flavio has, we restart the taskmanagers once they reach a
memory threshold. We created a small test to remove all of our
dependencies and leave only flink native libraries. This test reads
data from a Kafka topic and writes it back to another topic in
Kafka. We cancel the job and start another every 5 seconds. After
~30 minutes of doing this process, the cluster reaches the OS memory
limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots
per node. We have one job that uses 56 slots, and we cannot execute
that job 5 times in a row because the whole cluster dies. If you
want, we can publish our test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek 
wrote:

@Nico & @Piotr Could you please have a look at this? You both
recently worked on the network stack and might be most familiar with
this.

On 8. Nov 2017, at 10:25, Flavio Pompermaier 
wrote:

We also have the same problem in production. At the moment the
solution is to restart the entire Flink cluster after every job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
know whether the error produced by the test and the leak are
correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share that code?
If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru 
wrote:
Hi Ufuk,

We don’t explicitly define any state descriptor. We only use map
and filters
operator. We thought that gc handle clearing the flink’s internal
states.
So how can we manage the memory if it is always increasing?

- Ebru

On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:

Hey Ebru, the memory usage might be increasing as long as a job is
running.
This is expected (also in the case of multiple running jobs). The
screenshots are not helpful in that regard. :-(

What kind of stateful operations are you using? Depending on your
use case,
you have to manually call `clear()` on the state instance in order
to
release the managed state.

Best,

Ufuk

On Tue, Nov 7, 2017 at 12:43 PM, ebru
 wrote:

Begin forwarded message:

From: ebru 
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi 

Hi Ufuk,

There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And the
memory
usage is always increasing over time.

<1.png><2.png><3.png>

On 7 Nov 2017, at 13:34, Ufuk Celebi  wrote:

Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing
this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk

On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:

Hi,

We are using Flink 1.3.1 in production, we have one job manager and
3 task
managers in standalone mode. Recently, we've noticed that we have
memory
related problems. We use docker container to serve Flink cluster. We
have
300 slots and 20 jobs are running with parallelism of 10. Also the
job
count
may be change over time. Taskmanager memory usage always increases.
After
job cancelation this memory usage doesn't decrease. We've tried to
investigate the problem and we've got the task manager jvm heap
snapshot.
According to the jam heap analysis, possible memory leak was Flink
list
state descriptor. But we are not sure that is the cause of our
memory
problem. How can we solve the problem?

We have two types of Flink job. One has no state full operator
contains only maps and filters and the other has time window with

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s) look 
like? Could you post some code? If you are just using maps and filters there 
shouldn’t be any network transfers involved, aside from Source and Sink 
functions.

Piotrek

> On 8 Nov 2017, at 12:54, ebru  wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us. 
> Which configurations did you try?
> 
> -Ebru
>> On 8 Nov 2017, at 14:43, Javier Lopez > > wrote:
>> 
>> Hi,
>> 
>> We have been facing a similar problem. We have tried some different 
>> configurations, as proposed in other email thread by Flavio and Kien, but it 
>> didn't work. We have a workaround similar to the one that Flavio has, we 
>> restart the taskmanagers once they reach a memory threshold. We created a 
>> small test to remove all of our dependencies and leave only flink native 
>> libraries. This test reads data from a Kafka topic and writes it back to 
>> another topic in Kafka. We cancel the job and start another every 5 seconds. 
>> After ~30 minutes of doing this process, the cluster reaches the OS memory 
>> limit and dies. 
>> 
>> Currently, we have a test cluster with 8 workers and 8 task slots per node. 
>> We have one job that uses 56 slots, and we cannot execute that job 5 times 
>> in a row because the whole cluster dies. If you want, we can publish our 
>> test job.
>> 
>> Regards,
>> 
>> On 8 November 2017 at 11:20, Aljoscha Krettek > > wrote:
>> @Nico & @Piotr Could you please have a look at this? You both recently 
>> worked on the network stack and might be most familiar with this.
>> 
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >> > wrote:
>>> 
>>> We also have the same problem in production. At the moment the solution is 
>>> to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see 
>>> https://issues.apache.org/jira/browse/FLINK-7845 
>>> ) but we don't know 
>>> whether the error produced by the test and the leak are correlated..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> > 
>>> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>> 
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >> > wrote:
>>> Hi Ufuk,
>>> 
>>> We don’t explicitly define any state descriptor. We only use map and filters
>>> operator. We thought that gc handle clearing the flink’s internal states.
>>> So how can we manage the memory if it is always increasing?
>>> 
>>> - Ebru
>>> 
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi >> > wrote:
>>> 
>>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>> 
>>> What kind of stateful operations are you using? Depending on your use case,
>>> you have to manually call `clear()` on the state instance in order to
>>> release the managed state.
>>> 
>>> Best,
>>> 
>>> Ufuk
>>> 
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >> > wrote:
>>> 
>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: ebru >> >
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi >
>>> 
>>> Hi Ufuk,
>>> 
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>> usage is always increasing over time.
>>> 
>>> 
>>> 
>>> 
>>> <1.png><2.png><3.png>
>>> 
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi >> > wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> 

Re: Flink memory leak

2017-11-08 Thread ebru
Hi Javier,

It would be helpful if you share your test job with us. 
Which configurations did you try?

-Ebru
> On 8 Nov 2017, at 14:43, Javier Lopez  wrote:
> 
> Hi,
> 
> We have been facing a similar problem. We have tried some different 
> configurations, as proposed in other email thread by Flavio and Kien, but it 
> didn't work. We have a workaround similar to the one that Flavio has, we 
> restart the taskmanagers once they reach a memory threshold. We created a 
> small test to remove all of our dependencies and leave only flink native 
> libraries. This test reads data from a Kafka topic and writes it back to 
> another topic in Kafka. We cancel the job and start another every 5 seconds. 
> After ~30 minutes of doing this process, the cluster reaches the OS memory 
> limit and dies. 
> 
> Currently, we have a test cluster with 8 workers and 8 task slots per node. 
> We have one job that uses 56 slots, and we cannot execute that job 5 times in 
> a row because the whole cluster dies. If you want, we can publish our test 
> job.
> 
> Regards,
> 
> On 8 November 2017 at 11:20, Aljoscha Krettek  > wrote:
> @Nico & @Piotr Could you please have a look at this? You both recently worked 
> on the network stack and might be most familiar with this.
> 
>> On 8. Nov 2017, at 10:25, Flavio Pompermaier > > wrote:
>> 
>> We also have the same problem in production. At the moment the solution is 
>> to restart the entire Flink cluster after every job..
>> We've tried to reproduce this problem with a test (see 
>> https://issues.apache.org/jira/browse/FLINK-7845 
>> ) but we don't know 
>> whether the error produced by the test and the leak are correlated..
>> 
>> Best,
>> Flavio
>> 
>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> > wrote:
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>> Do you use any windowing? If yes, could you please share that code? If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>> 
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru > > wrote:
>> Hi Ufuk,
>> 
>> We don’t explicitly define any state descriptor. We only use map and filters
>> operator. We thought that gc handle clearing the flink’s internal states.
>> So how can we manage the memory if it is always increasing?
>> 
>> - Ebru
>> 
>> On 7 Nov 2017, at 16:23, Ufuk Celebi > > wrote:
>> 
>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>> This is expected (also in the case of multiple running jobs). The
>> screenshots are not helpful in that regard. :-(
>> 
>> What kind of stateful operations are you using? Depending on your use case,
>> you have to manually call `clear()` on the state instance in order to
>> release the managed state.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> On Tue, Nov 7, 2017 at 12:43 PM, ebru > > wrote:
>> 
>> 
>> 
>> Begin forwarded message:
>> 
>> From: ebru > >
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi >
>> 
>> Hi Ufuk,
>> 
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>> usage is always increasing over time.
>> 
>> 
>> 
>> 
>> <1.png><2.png><3.png>
>> 
>> On 7 Nov 2017, at 13:34, Ufuk Celebi > > wrote:
>> 
>> Hey Ebru,
>> 
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>> 
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>> 
>> – Ufuk
>> 
>> 
>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> > wrote:
>> 
>> Hi,
>> 
>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>> managers in standalone mode. Recently, we've noticed that we have memory
>> related problems. We use docker container to serve Flink cluster. We have
>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>> count
>> may be change over time. Taskmanager memory usage always increases. After
>> job cancelation this memory usage doesn't decrease. We've tried to
>> investigate the problem and we've got the task manager jvm heap snapshot.

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and Kien, but
it didn't work. We have a workaround similar to the one that Flavio has, we
restart the taskmanagers once they reach a memory threshold. We created a
small test to remove all of our dependencies and leave only flink native
libraries. This test reads data from a Kafka topic and writes it back to
another topic in Kafka. We cancel the job and start another every 5
seconds. After ~30 minutes of doing this process, the cluster reaches the
OS memory limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots per node.
We have one job that uses 56 slots, and we cannot execute that job 5 times
in a row because the whole cluster dies. If you want, we can publish our
test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek  wrote:

> @Nico & @Piotr Could you please have a look at this? You both recently
> worked on the network stack and might be most familiar with this.
>
> On 8. Nov 2017, at 10:25, Flavio Pompermaier  wrote:
>
> We also have the same problem in production. At the moment the solution is
> to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845) but we don't know
> whether the error produced by the test and the leak are correlated..
>
> Best,
> Flavio
>
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926...@cs.hacettepe.edu.tr> wrote:
>
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru 
>>> wrote:
>>>
 Hi Ufuk,

 We don’t explicitly define any state descriptor. We only use map and
 filters
 operator. We thought that gc handle clearing the flink’s internal
 states.
 So how can we manage the memory if it is always increasing?

 - Ebru

 On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:

 Hey Ebru, the memory usage might be increasing as long as a job is
 running.
 This is expected (also in the case of multiple running jobs). The
 screenshots are not helpful in that regard. :-(

 What kind of stateful operations are you using? Depending on your use
 case,
 you have to manually call `clear()` on the state instance in order to
 release the managed state.

 Best,

 Ufuk

 On Tue, Nov 7, 2017 at 12:43 PM, ebru 
 wrote:

>
>
>
> Begin forwarded message:
>
> From: ebru 
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi 
>
> Hi Ufuk,
>
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
> usage is always increasing over time.
>
>
>
>
> <1.png><2.png><3.png>
>
> On 7 Nov 2017, at 13:34, Ufuk Celebi  wrote:
>
> Hey Ebru,
>
> let me pull in Aljoscha (CC'd) who might have an idea what's causing
> this.
>
> Since multiple jobs are running, it will be hard to understand to
> which job the state descriptors from the heap snapshot belong to.
> - Is it possible to isolate the problem and reproduce the behaviour
> with only a single job?
>
> – Ufuk
>
>
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>  wrote:
>
> Hi,
>
> We are using Flink 1.3.1 in production, we have one job manager and 3
> task
> managers in standalone mode. Recently, we've noticed that we have
> memory
> related problems. We use docker container to serve Flink cluster. We
> have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job
> count
> may be change over time. Taskmanager memory usage always increases.
> After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap
> snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?
>
>
>
> We have two types of Flink job. One has no state full operator
 contains only maps and filters and the other has time window with count
 trigger.

>>> * We've analysed the jvm heaps 

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, Dawid, for your thorough explanation, really useful. I
totally missed the distinction between timed-out events and complete
matches.

I'd like to ask you one more thing, about the flinkCEP scala api: in the
documentation, there is the following code:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] =
patternStream.select(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>
TimeoutEvent()} {
pattern: Map[String, Iterable[Event]] => ComplexEvent()}

where result would then be used to get outputtag side output.

If I paste this code I get that the select function is missing its
parameters ("Unspecified value parameters: patternSelectFunction:
PatternSelectFunction[ComplexEvent, NotInferredR]""),
while, If I add the parameters explicitly such as

patternStream.select[TimeoutEvent, ComplexEvent]

I get "Too many arguments for select". Am I missing something?

Thank you very much,

Federico


2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz :

> Hi Federico,
>
> For your given input and pattern there should (and there are) only two
> timeouted patterns:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
>
> It is because in your patterns say the next event after events with value
> >=100 should not have value >= 100 . And within your timeout there is no
> sequence of events where (>=100)+ (<100).
>
> But I will try to explain how it works with the same input for Pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
>
> Then we have matches:
>
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02
>
> and timeouted partial matches:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
>
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you
> will be able to specify AFTER_MATCH_SKIP strategy see:
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
> matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
> 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to
> alter it see: https://issues.apache.org/jira/browse/FLINK-7147),
> therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and
> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))
> rather than only one of those.
>
> The timeoute partial matches are returned because within the timeout there
> was no event with value <100 (in fact there was no event at all to be
> checked).
>
> Hope this "study" helps you understand the behaviour. If you feel I missed
> something, please provide some example I could reproduce.
>
> Regards,
> Dawid
>
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi :
>
>> Hey Frederico,
>>
>> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
>> the expected behaviour here.
>>
>> Best,
>>
>> Ufuk
>>
>>
>> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
>>  wrote:
>> > Hi everyone,
>> >
>> > I wanted to ask if FlinkCEP in the following scenario is working as it
>> > should, or I have misunderstood its functioning.
>> >
>> > I've got a keyedstream associated with the following pattern:
>> >
>> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
>> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
>> >
>> > Considering a single key in the stream, for simplicity, I've got the
>> > following sequence of events (using EventTime on the "time" field of the
>> > json event):
>> >
>> > {value: 100, time: "2017-11-05 03:50:02.000"}
>> > {value: 100, time: "2017-11-05 03:52:02.000"}
>> > {value: 100, time: "2017-11-05 03:54:02.000"}
>> > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within
>> the 30
>> > minutes from the first event
>> > {value: 100, time: "2017-11-05 06:00:02.000"}
>> >
>> > 

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method 
returns the index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will 
always be returned for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka 
partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition 
will be filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

The code of kafka partition assign is like follows:  

public static int assign(KafkaTopicPartition partition, int  
numParallelSubtasks) {  
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFF) %  
numParallelSubtasks;  

// here, the assumption is that the id of Kafka partitions are always  
ascending  
// starting from 0, and therefore can be used directly as the offset  
clockwise from the start index  
return (startIndex + partition.getPartition()) % numParallelSubtasks;  
}  

It seems it will assign to multi sub tasks.  
I wonder how flink ensure some subtasks will simply remain idle  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
The code of kafka partition assign is like follows:

public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 
0x7FFF) %
numParallelSubtasks;

// here, the assumption is that the id of Kafka partitions are 
always
ascending
// starting from 0, and therefore can be used directly as the 
offset
clockwise from the start index
return (startIndex + partition.getPartition()) % 
numParallelSubtasks;
}

It seems it will assign to multi sub tasks.
I wonder how flink ensure some subtasks will simply remain idle



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink memory leak

2017-11-08 Thread Aljoscha Krettek
@Nico & @Piotr Could you please have a look at this? You both recently worked 
on the network stack and might be most familiar with this.

> On 8. Nov 2017, at 10:25, Flavio Pompermaier  wrote:
> 
> We also have the same problem in production. At the moment the solution is to 
> restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see 
> https://issues.apache.org/jira/browse/FLINK-7845 
> ) but we don't know whether 
> the error produced by the test and the leak are correlated..
> 
> Best,
> Flavio
> 
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> > wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code? If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru  > wrote:
> Hi Ufuk,
> 
> We don’t explicitly define any state descriptor. We only use map and filters
> operator. We thought that gc handle clearing the flink’s internal states.
> So how can we manage the memory if it is always increasing?
> 
> - Ebru
> 
> On 7 Nov 2017, at 16:23, Ufuk Celebi  > wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your use case,
> you have to manually call `clear()` on the state instance in order to
> release the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru  > wrote:
> 
> 
> 
> Begin forwarded message:
> 
> From: ebru  >
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi >
> 
> Hi Ufuk,
> 
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
> usage is always increasing over time.
> 
> 
> 
> 
> <1.png><2.png><3.png>
> 
> On 7 Nov 2017, at 13:34, Ufuk Celebi  > wrote:
> 
> Hey Ebru,
> 
> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
> 
> Since multiple jobs are running, it will be hard to understand to
> which job the state descriptors from the heap snapshot belong to.
> - Is it possible to isolate the problem and reproduce the behaviour
> with only a single job?
> 
> – Ufuk
> 
> 
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> > wrote:
> 
> Hi,
> 
> We are using Flink 1.3.1 in production, we have one job manager and 3 task
> managers in standalone mode. Recently, we've noticed that we have memory
> related problems. We use docker container to serve Flink cluster. We have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job
> count
> may be change over time. Taskmanager memory usage always increases. After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?
> 
> 
> 
> We have two types of Flink job. One has no state full operator contains only 
> maps and filters and the other has time window with count trigger.
> * We've analysed the jvm heaps again in different conditions. First we 
> analysed the snapshot when no flink jobs running on cluster. (image 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has no 
> state full operator is running. And according to the results, leak suspect 
> was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak suspect 
> was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we noticed that 
> when job is submitted some amount of memory allocated and after cancelation 
> this allocated memory never freed. So over time memory usage is always 
> increasing and exceeded the limits.
> 
> 



Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi Ashish,

From your description I do not yet have much of an idea of what may be 
happening.
However, some of your observations seems reasonable. I’ll go through them one 
by one:

I did try to modify request.timeout.ms, linger.ms etc to help with the issue if 
it were caused by a sudden burst of data or something along those lines. 
However, what it caused the app to increase back pressure and made the slower 
and slower until that timeout is reached.

If the client is experiencing trouble in writing outstanding records to Kafka, 
and the timeout is increased, then I think increased back pressure is indeed 
the expected behavior.

I noticed that consumer fetch-rate drops tremendously while fetch-size grows 
exponentially BEFORE the producer actually start to show higher response-time 
and lower rates.

Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should 
be a natural consequence of backpressure in the job.
The fetch loop in the consumer will be blocked temporarily when backpressure is 
propagated from downstream operators, resulting in longer fetch intervals and 
larger batches on each fetch (given that events rate are still constant).
Therefore, I think the root cause is still along the lines of the producer side.

Would you happen to have any logs that maybe shows any useful information on 
the producer side?
I think we might have a better chance of finding out what is going on by 
digging there.
Also, which Flink version & Kafka version are you using?

Cheers,
Gordon
On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com) wrote:

All,  

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.  

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense.  

Thanks, Ashish

Re: Remove the HDFS directory in org.apache.flink.util.FileUtils.deletePathIfEmpty

2017-11-08 Thread Aljoscha Krettek
Hi,

You images did not make it through to the mailing list.

Best,
Aljoscha

> On 8. Nov 2017, at 05:25, 马庆祥  wrote:
> 
> Hi,all,
> 
> I enable checkpoint with the configuration in the below figure .
> 
> 
> it works, but keep getting the below exception:
> 
> 
> I want to know if the below commit  is to resolve the above problem, but the 
> exception still appears.
> [hotfix] [core] Fix FileUtils.deletePathIfEmpty
> 
> Flink version: 1.3.1
> Hadoop version: 1.x
> 
> thanks~



Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi!

You can set the parallelism of the Flink Kafka Consumer independent of the 
number of partitions.
If there are more consumer subtasks than the number of Kafka partitions to read 
(i.e. when the parallelism of the consumer is set higher than the number of 
partitions), some subtasks will simply remain idle.
Each Kafka partition is deterministically assigned to a single consumer subtask.

Cheers,
Gordon


On 8 November 2017 at 4:21:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

It seems the same partition data will be consume multi times? 



-- 
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 


Re: Flink memory leak

2017-11-08 Thread Flavio Pompermaier
We also have the same problem in production. At the moment the solution is
to restart the entire Flink cluster after every job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845) but we don't know whether
the error produced by the test and the leak are correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:

> On 2017-11-07 16:53, Ufuk Celebi wrote:
>
>> Do you use any windowing? If yes, could you please share that code? If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>>
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru 
>> wrote:
>>
>>> Hi Ufuk,
>>>
>>> We don’t explicitly define any state descriptor. We only use map and
>>> filters
>>> operator. We thought that gc handle clearing the flink’s internal states.
>>> So how can we manage the memory if it is always increasing?
>>>
>>> - Ebru
>>>
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:
>>>
>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>> running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>>
>>> What kind of stateful operations are you using? Depending on your use
>>> case,
>>> you have to manually call `clear()` on the state instance in order to
>>> release the managed state.
>>>
>>> Best,
>>>
>>> Ufuk
>>>
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru 
>>> wrote:
>>>



 Begin forwarded message:

 From: ebru 
 Subject: Re: Flink memory leak
 Date: 7 November 2017 at 14:09:17 GMT+3
 To: Ufuk Celebi 

 Hi Ufuk,

 There are there snapshots of htop output.
 1. snapshot is initial state.
 2. snapshot is after submitted one job.
 3. Snapshot is the output of the one job with 15000 EPS. And the memory
 usage is always increasing over time.




 <1.png><2.png><3.png>

 On 7 Nov 2017, at 13:34, Ufuk Celebi  wrote:

 Hey Ebru,

 let me pull in Aljoscha (CC'd) who might have an idea what's causing
 this.

 Since multiple jobs are running, it will be hard to understand to
 which job the state descriptors from the heap snapshot belong to.
 - Is it possible to isolate the problem and reproduce the behaviour
 with only a single job?

 – Ufuk


 On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
  wrote:

 Hi,

 We are using Flink 1.3.1 in production, we have one job manager and 3
 task
 managers in standalone mode. Recently, we've noticed that we have memory
 related problems. We use docker container to serve Flink cluster. We
 have
 300 slots and 20 jobs are running with parallelism of 10. Also the job
 count
 may be change over time. Taskmanager memory usage always increases.
 After
 job cancelation this memory usage doesn't decrease. We've tried to
 investigate the problem and we've got the task manager jvm heap
 snapshot.
 According to the jam heap analysis, possible memory leak was Flink list
 state descriptor. But we are not sure that is the cause of our memory
 problem. How can we solve the problem?



 We have two types of Flink job. One has no state full operator contains
>>> only maps and filters and the other has time window with count trigger.
>>>
>> * We've analysed the jvm heaps again in different conditions. First we
> analysed the snapshot when no flink jobs running on cluster. (image 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has no
> state full operator is running. And according to the results, leak suspect
> was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak
> suspect was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we noticed
> that when job is submitted some amount of memory allocated and after
> cancelation this allocated memory never freed. So over time memory usage is
> always increasing and exceeded the limits.
>
>>


What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
It seems the same partition data will be consume multi times?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/