Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-03 Thread Randal Pitt
Thanks everyone for the responses.

I tried out the JeMalloc suggestion from FLINK-19125 using a patched 1.11.3
image and so far it appears to working well. I see it's included in 1.12.1
and Docker images are available so I'll look at upgrading too.

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Randal Pitt
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original

  
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Randal Pitt
Hi,

We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
10 running on 10 task managers each with 1 task slot. The job has 4 time
windows with 2 different keys, 2 windows have reducers and 2 are processed
by window functions. State is stored in RocksDB.

We've noticed when a pod is restarted (say if the node it was on is
restarted) the job restarts and the memory usage of the remaining 9 pods
increases by roughly 1GB over the next 1-2 hours then stays at that level.
If another pod restarts the remaining 9 increase in memory usage again.
Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
to the job restarting and memory usage increasing again.

If left it can lead to the situation where an OOMKill directly leads to an
OOMKill which directly leads to another. At this point it requires manual
intervention to resolve.

I think it's exceedingly likely the excessive memory usage is in RocksDB
rather than Flink, my question is whether there's anything we can do about
the increase in memory usage after a failure?


 

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Event time issues when Kinesis consumer receives batched messages

2020-12-09 Thread Randal Pitt
Thanks Roman, I'll look into how I go about doing that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Randal Pitt
Hi Roman,

We're using a custom watermarker that uses a histogram to calculate a "best
fit" event time as the data we receive can be very unordered.

As you can see we're using the timestamp from the first event in the batch,
so we're essentially sampling the timestamps rather than using them all.

FlinkKinesisConsumer> consumer = new
FlinkKinesisConsumer<>(...);

consumer.setPeriodicWatermarkAssigner(
new HistogramWatermarker<>(Time.minutes(30), 100) {
@Override
public long extractTimestamp(final Batch element) {
return element.getBatch().get(0).getDate().getTime();
}
}
);

Cheers,
Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Event time issues when Kinesis consumer receives batched messages

2020-12-07 Thread Randal Pitt
Hi there,

We're using Flink to read from a Kinesis stream. The stream contains
messages that themselves contain lists of events and we want our Flink jobs
(using the event time characteristic) to process those events individually.
We have this working using flatMap in the DataStream but we're having
trouble correctly assigning timestamps to the events.

We have been using FlinkKinesisConsumer.setPeriodicWatermarkAssigner() as
that should mean the watermarks are generated correctly, but it results in
all events in one message sharing a timestamp, resulting in some events
being assigned to the wrong window.

Using DataStream.assignTimestampsAndWatermarks() after the flatMap means we
can assign the correct timestamps, but the watermarks may not necessarily be
correct with respect to the Kinesis shards.

Is there are strategy we can use that gets us both watermarks from the
Kinesis consumer and correct timestamps for individual events?

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/