Re: Status.JVM.Memory.Heap.Used metric shows only a few megabytes but profiling shows gigabytes (as expected)

2019-03-20 Thread Gerard Garcia
Thanks, next time I'll check in in jira better.

On Tue, Mar 19, 2019 at 6:27 PM Chesnay Schepler  wrote:

> Known issue, fixed in 1.7.3/1.8.0:
> https://issues.apache.org/jira/browse/FLINK-11183
>
> On 19.03.2019 15:03, gerardg wrote:
> > Hi,
> >
> > Before Flink 1.7.0 we were getting correct values in
> > Status.JVM.Memory.Heap.Used metric. Since updating we just see a constant
> > small value (just a few megabytes), did something change?
> >
> > Gerard
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>


Re: Timestamp synchronized message consumption across kafka partitions

2019-03-07 Thread Gerard Garcia
I'll answer myself. I guess the most viable option for now is to wait for
the work in
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html

On Thu, Mar 7, 2019, 3:24 PM gerardg  wrote:

> I'm wondering if there is a way to avoid consuming too fast from partitions
> that not have as much data as the other ones in the same topic by keeping
> them more or less synchronized by its ingestion timestamp. Similar to what
> kafka streams does:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
>
> We are having an issue where partitions with less data are consumed very
> fast which creates a lot of windows that can't be triggered until the
> partitions with more data are consumed and the watermark gets advanced. It
> seems that this issue should be quite common but we can't seem to find any
> standard solution to it. Maybe is just that our partitions are too
> unbalanced but still, without having a way to bound the skew between
> partition (for example when processing accumulated data) it seems like a
> potential source of problems.
>
> Anyone have an idea or suggestion to deal with this issue?
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Unbalanced Kafka consumer consumption

2018-12-19 Thread Gerard Garcia
We finally figure it out. We had a large value in the Kafka consumer option
'max.partition.fetch.bytes', this made the KafkaConsumer to not consume at
a balanced rate from all partitions.

Gerard


Re: Unbalanced Kafka consumer consumption

2018-11-22 Thread Gerard Garcia
HI Till,

sorry for the late reply, I was on holidays and couldn't follow up the
issue.

1. Flink 1.6.1, Kafka 1.1.0
2. The topic has 64 partitions. We don't have so many slots available but
we could try this.
3. Yes, they are running in different nodes
4. I meant that until the operator that is represented by its lag (meaning
the amount of messages to consume) with a blue line (in the background)
finishes consuming all its pending messages the rate does not increase.
 Yes, the problem appears when there is pending data (persisted) because it
can keep with the incoming rate or the app has been stopped.
5. Yes. I guess you mean numRecordsOutPerSecond. We will monitor this to
have data the next time it happens.

I'll try to reduce the number of partitions so they can be assigned one per
source task and see how it behaves.

Thanks,

Gerard

On Wed, Nov 7, 2018 at 5:38 PM Till Rohrmann  wrote:

> Hi Gerard,
>
> the behaviour you are describing sounds odd to me. I have a couple of
> questions:
>
> 1. Which Flink and Kafka version are you using?
> 2. How many partitions do you have? --> Try to set the parallelism of your
> job to the number of partitions. That way, you will have one partition per
> source task.
> 3. How are the source operators distributed? Are they running on different
> nodes?
> 4. What do you mean with "until it (the blue one) was finished consuming
> the partition"? I assume that you don't ingest into the Kafka topic live
> but want to read persisted data.
> 5. Are you using Flink's metrics to monitor the different source tasks?
> Check what the source operator's output rate is (should be visible from the
> web UI).
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia  wrote:
>
>> I think my problem is not the same, yours is that you want to consume
>> from partitions with more data faster to avoid consuming first the one with
>> less elements which could advance the event time too fast. Mine is that
>> Kafka only consumes from some partitions even if it seems that it has
>> resources to read and process from all of them at the same time.
>>
>> Gerard
>>
>> On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>If I understand your problem correctly, there is a similar JIRA
>>> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>>>
>>>
>>> Jiayi Liao,Best
>>>
>>>  Original Message
>>> *Sender:* Gerard Garcia
>>> *Recipient:* fearsome.lucidity
>>> *Cc:* user
>>> *Date:* Monday, Oct 29, 2018 17:50
>>> *Subject:* Re: Unbalanced Kafka consumer consumption
>>>
>>> The stream is partitioned by key after ingestion at the finest
>>> granularity that we can (which is finer than how stream is partitioned when
>>> produced to kafka). It is not perfectly balanced but still is not so
>>> unbalanced to show this behavior (more balanced than what the lag images
>>> show).
>>>
>>> Anyway, let's assume that the problem is that the stream is so
>>> unbalanced that one operator subtask can't handle the ingestion rate. It is
>>> expected then that all the others operators reduce its ingestion rate even
>>> if they have resources to spare? The task is configured with processing
>>> time and there are no windows. If that is the case, is there a way to let
>>> operator subtasks process freely even if one of them is causing back
>>> pressure upstream?
>>>
>>> The attached images shows how Kafka lag increases while the throughput
>>> is stable until some operator subtasks finish.
>>>
>>> Thanks,
>>>
>>> Gerard
>>>
>>> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
>>> wrote:
>>>
>>>> You can always shuffle the stream generated by the Kafka source
>>>> (dataStream.shuffle()) to evenly distribute records downstream.
>>>>
>>>> On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are experience issues scaling our Flink application and we have
>>>>> observed
>>>>> that it may be because Kafka messages consumption is not balanced
>>>>> across
>>>>> partitions. The attached image (lag per partition) shows how only one
>>>>> partition consumes messages (the blue one in the back) and it wasn't
>>>>> until
>>>>> it finished that the other ones started to consume at a good rate
>>>>> (actually
>>>>> the total throughput multiplied b

Re: Unbalanced Kafka consumer consumption

2018-10-30 Thread Gerard Garcia
I think my problem is not the same, yours is that you want to consume from
partitions with more data faster to avoid consuming first the one with less
elements which could advance the event time too fast. Mine is that Kafka
only consumes from some partitions even if it seems that it has resources
to read and process from all of them at the same time.

Gerard

On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:

> Hi,
>
>If I understand your problem correctly, there is a similar JIRA
> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>
>
> Jiayi Liao,Best
>
>  Original Message
> *Sender:* Gerard Garcia
> *Recipient:* fearsome.lucidity
> *Cc:* user
> *Date:* Monday, Oct 29, 2018 17:50
> *Subject:* Re: Unbalanced Kafka consumer consumption
>
> The stream is partitioned by key after ingestion at the finest granularity
> that we can (which is finer than how stream is partitioned when produced to
> kafka). It is not perfectly balanced but still is not so unbalanced to show
> this behavior (more balanced than what the lag images show).
>
> Anyway, let's assume that the problem is that the stream is so unbalanced
> that one operator subtask can't handle the ingestion rate. It is expected
> then that all the others operators reduce its ingestion rate even if they
> have resources to spare? The task is configured with processing time and
> there are no windows. If that is the case, is there a way to let operator
> subtasks process freely even if one of them is causing back pressure
> upstream?
>
> The attached images shows how Kafka lag increases while the throughput is
> stable until some operator subtasks finish.
>
> Thanks,
>
> Gerard
>
> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
> wrote:
>
>> You can always shuffle the stream generated by the Kafka source
>> (dataStream.shuffle()) to evenly distribute records downstream.
>>
>> On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:
>>
>>> Hi,
>>>
>>> We are experience issues scaling our Flink application and we have
>>> observed
>>> that it may be because Kafka messages consumption is not balanced across
>>> partitions. The attached image (lag per partition) shows how only one
>>> partition consumes messages (the blue one in the back) and it wasn't
>>> until
>>> it finished that the other ones started to consume at a good rate
>>> (actually
>>> the total throughput multiplied by 4 when these started) . Also, when
>>> that
>>> ones started to consume, one partition just stopped an accumulated
>>> messages
>>> back again until they finished.
>>>
>>> We don't see any resource (CPU, network, disk..) struggling in our
>>> cluster
>>> so we are not sure what could be causing this behavior. I can only assume
>>> that somehow Flink or the Kafka consumer is artificially slowing down the
>>> other partitions. Maybe due to how back pressure is handled?
>>>
>>> <
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>
>>>
>>>
>>> Gerard
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-08-29 Thread Gerard Garcia
Hi Till,

Sorry for the late reply, I was waiting to update to Flink 1.6.0 to see if
the problem got fixed but I still experience the first issue (jobgraph not
deleted from zookeeper when task is canceled). The second issue
(taskmanagers unable to register to the new elected jobmanager) was
actually a configuration error, all jobmanagers had the
"jobmanager.rpc.address" option set to point to the same jobmanager so I
guess the new one was not registering its url correctly in Zookeeper.

And for the first issue, it seems to be a known bug:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Old-job-resurrected-during-HA-failover-td22000.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JobGraphs-not-cleaned-up-in-HA-mode-td22600.html
https://issues.apache.org/jira/browse/FLINK-10011

so I guess I'll have to wait until there is a fix as I have not seens any
workaround (other than removing the jobgraph from Zookeeper after
cancelling the task)

Thanks,

Gerard


On Tue, Jul 24, 2018 at 1:46 PM Till Rohrmann  wrote:

> Hi Gerard,
>
> the first log snippet from the client does not show anything suspicious.
> The warning just says that you cannot use the Yarn CLI because it lacks the
> Hadoop dependencies in the classpath.
>
> The second snippet is indeed more interesting. If the TaskExecutors are
> not notified about the changed leader, then this might indicate a problem
> with the ZooKeeper connection or the ZooKeeper cluster itself. This might
> also explain why the job deletion from ZooKeeper does not succeed.
>
> One thing you could check is whether the leader ZNode under
> `/flink/default/leader/dispatcher_lock` (if you are using the defaults)
> actually contains the address of the newly elected leader. The leader path
> should also be logged in the cluster entrypoint logs. You can use the
> ZooKeeper cli for accessing the ZNodes.
>
> Cheers,
> Till
>
> On Mon, Jul 23, 2018 at 4:07 PM Gerard Garcia  wrote:
>
>> We have just started experiencing a different problem that could be
>> related, maybe it helps to diagnose the issue.
>>
>> In the last 24h the jobmanager lost connection to Zookeeper a couple of
>> times. Each time, a new jobmanager (in a different node) was elected leader
>> correctly but the taskamangers kept trying to connect to the old
>> jobmanager. These are the ending log messages until the taskamanger shut
>> down itself.
>>
>> 12:06:41.747 [flink-akka.actor.default-dispatcher-5] WARN
>> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: (...)1/192.168.1.9:35605
>> 12:06:41.748 [flink-akka.actor.default-dispatcher-2] INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Could not resolve
>> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager,
>> retrying in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@(...)1:35605/user/resourcemanager..
>> 12:06:41.748 [flink-akka.actor.default-dispatcher-5] WARN
>> akka.remote.ReliableDeliverySupervisor
>> flink-akka.remote.default-remote-dispatcher-15 - Association with remote
>> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated
>> for [50] ms. Reason: [Association failed with 
>> [akka.tcp://flink@(...)1:35605]]
>> Caused by: [Connection refused: (...)1/192.168.1.9:35605]
>> 12:06:51.766 [flink-akka.actor.default-dispatcher-5] WARN
>> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: (...)1/192.168.1.9:35605
>> 12:06:51.767 [flink-akka.actor.default-dispatcher-2] INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Could not resolve
>> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager,
>> retrying in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@(...)1:35605/user/resourcemanager..
>> 12:06:51.767 [flink-akka.actor.default-dispatcher-5] WARN
>> akka.remote.ReliableDeliverySupervisor
>> flink-akka.remote.default-remote-dispatcher-7 - Association with remote
>> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated
>> for [50] ms. Reason: [Association failed with 
>> [akka.tcp://flink@(...)1:35605]]
>> Caused by: [Connection refused: (...)1/192.168.1.9:35605]
>> 12:07:01.123 [flink-akka.actor.default-dispatcher-5] ERROR
>> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Fatal error occurred
>> in TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0.
>> org.apache.flink.r

Re: Override CaseClassSerializer with custom serializer

2018-08-20 Thread Gerard Garcia
Hi Timo,

I see. Yes, we have already use the "Object Reuse" option. It was a nice
performance improvement when we first set it!

I guess another option we can try is to somehow make things "easier" to
Flink so it can chain operators together. Most of them are not chained, I
think it's because they have a control stream as source together with the
main stream. I'll need to check that and see if we can re-architecture them.

Thanks,

Gerard

On Fri, Aug 17, 2018 at 11:21 PM Timo Walther  wrote:

> Hi Gerard,
>
> you are correct, Kryo serializers are only used when no built-in Flink
> serializer is available.
>
> Actually, the tuple and case class serializers are one of the most
> performant serializers in Flink (due to their fixed length, no null
> support). If you really want to reduce the serialization overhead you
> could look into the object reuse mode. We had this topic on the mailing
> list recently, I will just copy it here:
>
> If you want to improve the performance of a collect() between operators,
> you could also enable object reuse. You can read more about this here
> [1] (section "Issue 2: Object Reuse"), but make sure your implementation
> is correct because an operator could modify the objects of follwing
> operators.
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1]
>
> https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime
>
> Am 17.08.18 um 17:29 schrieb gerardg:
> > Hello,
> >
> > I can't seem to be able to override the CaseClassSerializer with my
> custom
> > serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the
> > custom serializer but I don't see it being used. I guess it is because it
> > only uses Kryo based serializers if it can't find a Flink serializer?
> >
> > Is then worth it to replace the CaseClassSerializer with a custom
> > serializer? (when I profile the CaseClassSerializer.(de)serialize method
> > appears as the most used so I wanted to give it a try) If so, how can I
> do
> > it?
> >
> > Thanks,
> >
> > Gerard
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-23 Thread Gerard Garcia
)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Stopping TaskExecutor
akka.tcp://flink@(...)2:33455/user/taskmanager_0.
12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO
o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting
down TaskExecutorLocalStateStoresManager.
12:07:01.130 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed
spill file directory /home/tmp/flink-io-cf83b38e-53f1-4802-a097-e6db95b46084
12:07:01.130 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the
network environment and its components.
12:07:01.131 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.io.network.netty.NettyClient  - Successful
shutdown (took 1 ms).
12:07:01.132 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.io.network.netty.NettyServer  - Successful
shutdown (took 1 ms).
12:07:01.141 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.taskexecutor.JobLeaderService  - Stop job leader
service.
12:07:01.142 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Stopped TaskExecutor
akka.tcp://flink@(...)2:33455/user/taskmanager_0.
12:07:01.143 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB cache
12:07:01.143 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB cache
12:07:01.148 [Curator-Framework-0] INFO
o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  -
backgroundOperationsLoop exiting
12:07:01.173 [flink-akka.actor.default-dispatcher-5] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session:
0x3011955028f00f4 closed
12:07:01.173 [main-EventThread] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread
shut down for session: 0x3011955028f00f4
12:07:01.173 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopping Akka RPC
service.
12:07:01.178 [flink-akka.actor.default-dispatcher-3] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-akka.remote.default-remote-dispatcher-16 - Shutting down remote
daemon.
12:07:01.178 [flink-akka.actor.default-dispatcher-3] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-akka.remote.default-remote-dispatcher-16 - Remote daemon shut down;
proceeding with flushing remote transports.
12:07:01.190 [flink-akka.actor.default-dispatcher-5] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-akka.remote.default-remote-dispatcher-7 - Remoting shut down.

It seems like there are problems updating Zookeeper. I've have also noticed
these messages in Zookeeper log:

 WARN  [SyncThread:2:FileTxnLog@378] - fsync-ing the write ahead log in
SyncThread:2 took 1259ms which will adversely effect operation latency. See
the ZooKeeper troubleshooting guide

Maybe Flink hits some timeout?

Gerard

On Mon, Jul 23, 2018 at 11:57 AM Gerard Garcia  wrote:

> Hi Till,
>
> I can't post the full log (as there is internal info in them) but I've
> found this. Is that what you are looking for?
>
> 11:29:17.351 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
> 
> 11:29:17.372 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
> Starting Command Line Client (Version: 1.5-SNAPSHOT, Rev:a4fc4c6,
> Date:05.06.2018 @ 10:22:30 CEST)
> 11:29:17.372 [main] INFO  org.apache.flink.client.cli.CliFrontend  -  OS
> current user: (...)
> 11:29:17.372 [main] INFO  org.apache.fl

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-23 Thread Gerard Garcia
Thanks Zhijiang,

Yes, I guess our best option right now is to just reduce the structure of
the output record and see if that solves the problem.

Gerard

On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> From the jstack you provided, the task is serializing the output record
> and during this process it will not process the input data any more.
> It can not indicate out of memory issue from this stack. And if the output
> buffer is exhausted, the task will be blocked on requestBufferBlocking
> process.
>
> I think the key point is your output record is too large and complicated
> structure, because every field and collection in this complicated class
> will be traversed to serialize, then it will cost much time and CPU usage.
> Furthermore, the checkpoint can not be done because of waiting for lock
> which is also occupied by task output process.
>
> As you mentioned, it makes sense to check the data structure of the output
> record and reduces the size or make it lightweight to handle.
>
> Best,
>
> Zhijiang
>
> ------
> 发件人:Gerard Garcia 
> 发送时间:2018年7月17日(星期二) 21:53
> 收件人:piotr 
> 抄 送:fhueske ; wangzhijiang999 <
> wangzhijiang...@aliyun.com>; user ; nico <
> n...@data-artisans.com>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record
> (probably too big, we have already started working to reduce its size)
> which consists of several case classes which have (among others) fields of
> type String.
>
> I attach a CPU profile of the thread stuck serializing. I also attach the
> memory and GC telemetry that the profiler shows (which maybe is more
> informative than the one recorded from the JVM metrics). Only one node was
> actually "doing something" all others had CPU usage near zero.
>
> The task is at the same time trying to perform a checkpoint but keeps
> failing. Would it make sense that the problem is that there is not enough
> memory available to perform the checkpoint so all operators are stuck
> waiting for it to finish, and at the same time, the operator stuck
> serializing is keeping all the memory so neither it nor the checkpoint can
> advance?
>
> I realized that I don't have a minimum pause between checkpoints so it is
> continuously trying. Maybe I can reduce the checkpoint timeout from the 10m
> default and introduce a minimum pause (e.g. 5m timeout and 5m minimum
> pause) and this way I could break the deadlock.
>
> Gerard
>
>
> On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski 
> wrote:
> Hi,
>
> Thanks for the additional data. Just to make sure, are you using Flink
> 1.5.0?
>
> There are a couple of threads that seams to be looping in serialisation,
> while others are blocked and either waiting for new data or waiting for
> some one to consume some data. Could you debug or CPU profile the code, in
> particularly focusing on threads with stack trace as below [1]. Aren’t you
> trying to serialise some gigantic String?
>
> Piotrek
>
> [1]:
>
> "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819
> runnable [0x7f451a843000]
>java.lang.Thread.State: RUNNABLE
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.se

Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-23 Thread Gerard Garcia
o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State
change: CONNECTED
11:29:17.764 [main] INFO  org.apache.flink.runtime.rest.RestClient  - Rest
client endpoint started.
11:29:17.766 [main] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
11:29:17.812 [main] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
11:29:18.007 [main] INFO  org.apache.flink.runtime.rest.RestClient  -
Shutting down rest endpoint.
11:29:18.008 [main] INFO  org.apache.flink.runtime.rest.RestClient  - Rest
endpoint shutdown complete.
11:29:18.008 [main] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
11:29:18.009 [main] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
11:29:18.010 [Curator-Framework-0] INFO
o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  -
backgroundOperationsLoop exiting
11:29:18.030 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session:
0x100571bda1903c3 closed
11:29:18.030 [main-EventThread] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread
shut down for session: 0x100571bda1903c3
11:29:18.030 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
Cancelled job e403893e5208ca47ace886a77e405291.

Gerard

On Fri, Jul 20, 2018 at 5:14 AM vino yang  wrote:

> Hi Till,
>
> You are right, we also saw the problem you said. Curator removes the
> specific job graph path asynchronously. But it's the only gist when
> recovering, right? Is there any plan to enhance this point?
>
> Thanks, vino.
>
> 2018-07-19 21:58 GMT+08:00 Till Rohrmann :
>
>> Hi Gerard,
>>
>> the logging statement `Removed job graph ... from ZooKeeper` is actually
>> not 100% accurate. The actual deletion is executed as an asynchronous
>> background task and the log statement is not printed in the callback (which
>> it should). Therefore, the deletion could still have failed. In order to
>> see this, the full jobmanager/cluster entry point logs would be
>> tremendously helpful.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 19, 2018 at 1:33 PM Gerard Garcia  wrote:
>>
>>> Thanks Andrey,
>>>
>>> That is the log from the jobmanager just after it has finished
>>> cancelling the task:
>>>
>>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping
>>> checkpoint coordinator for job e403893e5208ca47ace886a77e405291.
>>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
>>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down
>>> 11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO
>>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing
>>> /flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from
>>> ZooKeeper
>>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
>>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.
>>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
>>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing
>>> /checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper
>>> 11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
>>> e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED.
>>> 11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO
>>> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster for
>>> job (...)(e403893e5208ca47ace886a77e405291).
>>> 11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO
>>> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
>>> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
>>> org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager
>>> connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down..
>>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending SlotPool.
>>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping SlotPool.
>>> 11:29:18.864 [flink-akka.actor.default-dispatcher-1

Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-19 Thread Gerard Garcia
Thanks Andrey,

That is the log from the jobmanager just after it has finished cancelling
the task:

11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping
checkpoint coordinator for job e403893e5208ca47ace886a77e405291.
11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down
11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing
/flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from
ZooKeeper
11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.
11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing
/checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper
11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED.
11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster for
job (...)(e403893e5208ca47ace886a77e405291).
11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager
connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down..
11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending SlotPool.
11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping SlotPool.
11:29:18.864 [flink-akka.actor.default-dispatcher-15688] INFO
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect
job manager 
9cf221e2340597629fb932c03aa14...@akka.tcp://flink@(...):33827/user/jobmanager_9
for job e403893e5208ca47ace886a77e405291 from the resource manager.
11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping
ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/e403893e5208ca47ace886a77e405291/job_manager_lock'}.
11:29:18.980 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 31154 for job 5d8c376b10d358b9c9470b3e70113626 (132520 bytes in
411 ms).
11:29:19.025 [flink-akka.actor.default-dispatcher-15683] INFO
o.a.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job
graph e403893e5208ca47ace886a77e405291 from ZooKeeper.


At the end it says removed job graph e403893e5208ca47ace886a77e405291 from
ZooKeeper but I still can see it at /flink/default/jobgraphs:

[zk: localhost:2181(CONNECTED) 14] ls
/flink/default/jobgraphs/e403893e5208ca47ace886a77e405291
[3fe9c3c8-5bec-404e-a720-75f9b188124f, 36208299-0f6d-462c-bae4-2e3d53f50e8c]

Gerard

On Wed, Jul 18, 2018 at 4:24 PM Andrey Zagrebin 
wrote:

> Hi Gerard,
>
> There is an issue recently fixed for 1.5.2, 1.6.0:
> https://issues.apache.org/jira/browse/FLINK-9575
> It might have caused your problem.
>
> Can you please provide log from JobManager/Entry point for further
> investigation?
>
> Cheers,
> Andrey
>
> On 18 Jul 2018, at 10:16, Gerard Garcia  wrote:
>
> Hi vino,
>
> Seems that jobs id stay in /jobgraphs when we cancel them manually. For
> example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
> the entry is still in zookeeper's path /flink/default/jobgraphs, but the
> job disappeared from /home/nas/flink/ha/default/blob/.
>
> That is the client log:
>
> 09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
> Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
> 09:20:58.503 [main] INFO
> org.apache.flink.runtime.blob.FileSystemBlobStore  - Creating highly
> available BLOB storage directory at
> file:///home/nas/flink/ha//default/blob
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
> Enforcing default ACL for ZK connections
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
> Using '/flink-eur/default' as Zookeeper namespace.
> 09:20:58.539 [main] INFO
> o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
> 09:20:58.543 [main] INFO
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
> environment:zookeeper.version=
> 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, b

Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-18 Thread Gerard Garcia
Hi vino,

Seems that jobs id stay in /jobgraphs when we cancel them manually. For
example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
the entry is still in zookeeper's path /flink/default/jobgraphs, but the
job disappeared from /home/nas/flink/ha/default/blob/.

That is the client log:

09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
09:20:58.503 [main] INFO
org.apache.flink.runtime.blob.FileSystemBlobStore  - Creating highly
available BLOB storage directory at file:///home/nas/flink/ha//default/blob
09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
Enforcing default ACL for ZK connections
09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
Using '/flink-eur/default' as Zookeeper namespace.
09:20:58.539 [main] INFO
o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:zookeeper.version=
3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13
GMT
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:host.name=flink-eur-production1
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.version=1.8.0_131
09:20:58.544 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.vendor=Oracle Corporation
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.home=/opt/jdk/jdk1.8.0_131/jre
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.class.path=/opt/flink/flink-1.5.0/lib/commons-httpclient-3.1.jar:/opt/flink/flink-1.5.0/lib/flink-metrics-statsd-1.5.0.jar:/opt/flink/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink/flink-1.5.0/lib/fluency-1.8.0.jar:/opt/flink/flink-1.5.0/lib/gcs-connector-latest-hadoop2.jar:/opt/flink/flink-1.5.0/lib/hadoop-openstack-2.7.1.jar:/opt/flink/flink-1.5.0/lib/jackson-annotations-2.8.0.jar:/opt/flink/flink-1.5.0/lib/jackson-core-2.8.10.jar:/opt/flink/flink-1.5.0/lib/jackson-databind-2.8.11.1.jar:/opt/flink/flink-1.5.0/lib/jackson-dataformat-msgpack-0.8.15.jar:/opt/flink/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink/flink-1.5.0/lib/log4j-over-slf4j-1.7.25.jar:/opt/flink/flink-1.5.0/lib/logback-classic-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-core-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-more-appenders-1.4.2.jar:/opt/flink/flink-1.5.0/lib/msgpack-0.6.12.jar:/opt/flink/flink-1.5.0/lib/msgpack-core-0.8.15.jar:/opt/flink/flink-1.5.0/lib/phi-accural-failure-detector-0.0.4.jar:/opt/flink/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.io.tmpdir=/tmp
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.compiler=
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.name=Linux
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.arch=amd64
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.version=4.9.87--std-ipv6-64
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.name=root
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.home=/root
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.dir=/opt/flink/flink-1.5.0/bin
09:20:58.548 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating
client connection, connectString=10.1.1.5:2181,10.1.1.6:2181,10.1.1.7:2181
sessionTimeout=6
watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@4a003cbe
09:20:58.555 [main-SendThread(10.1.1.5:2181)] WARN
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
configuration failed: javax.security.auth.login.LoginException: No JAAS
configuration section named 'Client' was found in specified JAAS
configuration file: '/tmp/jaas-9143038863636945274.conf'. Will continue
connection to Zookeeper server without SASL authentication, if Zookeeper
server allows it.
09:20:58.556 [main-SendThread(10.1.1.5:2181)] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening
socket connection to server 10.1.1.5/10.1.1.5:2181
09:20:58.556 [main-EventThread] ERROR

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Gerard Garcia
Hi Zhijiang,

The problem is that no other task failed first. We have a task that
sometimes just stops processing data, and when we cancel it, we see the
logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is
stuck in method:
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why
it stops processing data. I don;t see any increase in memory use in the
heap (I guess because these buffers are managed by Flink) so I'm not sure
if that is really the problem.

Gerard

On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
>
> ----------
> 发件人:Gerard Garcia 
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 
> 抄 送:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>  Hi Gerard,
>
> From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> --
> 发件人:gerardg 
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerialize

Re: Get which key groups are assigned to an operator

2018-02-20 Thread Gerard Garcia
You are right that probably the best solution would be to be able to use
different state backends for different operators, I hope it gets
implemented at some point. Meanwhile I'll take a look at the methods in
org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a
workaround good enough for me.

Thanks,

Gerard

On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> ok, now I understand your goal a bit better. If would still like to point
> out that it may take a bit more than it looks like. Just to name one
> example, you probably also want to support asynchronous snapshots which is
> most likely difficult when using just a hashmap. I think the proper
> solution for you (and also something that we are considering to support in
> the future) is that different backends could be supported for different
> operators in a job. But that is currently not possible. I still want to
> answer your other question: you could currently compute all things about
> key-groups and their assignment to operators by using the methods
> from org.apache.flink.runtime.state.KeyGroupRangeAssignment.
>
> Best,
> Stefan
>
>
> Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ger...@talaia.io>:
>
> Hi Stefan, thanks
>
> Yes, we are also using keyed state in other operators the problem is that
> serialization is quite expensive and in some of them we would prefer to
> avoid it by storing the state in memory (for our use case one specific
> operator with in memory state gives at least a 30% throughput improvement).
> When we are not operating in a keyed stream is easy, basically all the
> operators have the same in memory state, what we would like to do is the
> same but when we are operating in a keyed stream. Does it make more sense
> now?
>
> We are using rocksdb as state backend and as far as I know elements get
> always serialized when stored in the state and I'm not sure if there is
> even some disk access (maybe not synchronously) that could hurt performance.
>
> Gerard
>
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> from what I read, I get the impression that you attempt to implement you
>> own "keyed state" with a hashmap? Why not using the keyed state that is
>> already provided by Flink and gives you efficient rescaling etc. out of the
>> box? Please see [1] for the details.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/state/state.html#using-managed-keyed-state
>>
>> Am 20.02.2018 um 13:44 schrieb gerardg <ger...@talaia.io>:
>>
>> Hello,
>>
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>>
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in
>> memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information.
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>>
>
>


Re: Get which key groups are assigned to an operator

2018-02-20 Thread Gerard Garcia
Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that
serialization is quite expensive and in some of them we would prefer to
avoid it by storing the state in memory (for our use case one specific
operator with in memory state gives at least a 30% throughput improvement).
When we are not operating in a keyed stream is easy, basically all the
operators have the same in memory state, what we would like to do is the
same but when we are operating in a keyed stream. Does it make more sense
now?

We are using rocksdb as state backend and as far as I know elements get
always serialized when stored in the state and I'm not sure if there is
even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter  wrote:

> Hi,
>
> from what I read, I get the impression that you attempt to implement you
> own "keyed state" with a hashmap? Why not using the keyed state that is
> already provided by Flink and gives you efficient rescaling etc. out of the
> box? Please see [1] for the details.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/state/state.html#using-managed-keyed-state
>
> Am 20.02.2018 um 13:44 schrieb gerardg :
>
> Hello,
>
> To improve performance we have " keyed state" in the operator's memory,
> basically we keep a Map which contains the state per each of the keys. The
> problem comes when we want to restore the state after a failure or after
> rescaling the operator. What we are doing is sending the concatenation of
> all the state to every operator using an union redistribution and then we
> restore the "in memory state" every time we see a new key. Then, after a
> while, we just clear the redistributed state. This is somewhat complex and
> prone to errors so we would like to find an alternative way of doing this.
>
> As far as I know Flink knows which keys belong to each operator
> (distributing key groups) so I guess it would be possible to somehow
> calculate the key id from each of the stored keys and restore the in memory
> state at once if we could access to the key groups mapping. Is that
> possible? We could patch Flink if necessary to access that information.
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>


Re: Kafka topic partition skewness causes watermark not being emitted

2017-12-13 Thread Gerard Garcia
Thanks Gordon.

Don't worry, I'll be careful to not have empty partitions until the next
release.
Also, I'll keep an eye to FLINK-5479 and if at some point I see that there
is a fix and the issue bothers us too much I'll try to apply the patch
myself to the latest stable release.

Gerard

On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I've just elevated FLINK-5479 to BLOCKER for 1.5.
>
> Unfortunately, AFAIK there is no easy workaround solution for this issue
> yet in the releases so far.
> The min watermark logic that controls per-partition watermark emission is
> hidden inside the consumer, making it hard to work around it.
>
> One possible solution I can imagine, but perhaps not that trivial, is to
> inject some special marker event into all partitions periodically.
> The watermark assigner should be able to recognize this special marker and
> try to provide some watermark for it.
> Another option is that I can provide some patch you can apply for a custom
> build of the Kafka connector that handles partition idleness properly.
> However, given that we're aiming for a faster release cycle for Flink 1.5
> (proposed release date is Feb. 2018), it might not be worth the extra
> maintenance effort on your side of a custom build.
>
> Best,
> Gordon
>
>
> On Tue, Dec 12, 2017 at 9:28 PM, gerardg  wrote:
>
>> I'm also affected by this behavior. There are no updates in FLINK-5479 but
>> did you manage to find a way to workaround this?
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Missing checkpoint when restarting failed job

2017-11-28 Thread Gerard Garcia
I've been monitoring the task and checkpoint 1 never gets deleted. Right
now we have:

chk-1  chk-1222  chk-326  chk-329  chk-357  chk-358  chk-8945  chk-8999
chk-9525  chk-9788  chk-9789  chk-9790  chk-9791

I made the task fail and it recovered without problems so for now I would
say that the problem was with the distributed system or that somehow the
chk-1 folder got deleted by something external to flink. If I see the
problem again I will try to get more information.

Thanks,

Gerard

On Tue, Nov 21, 2017 at 4:27 PM, Stefan Richter  wrote:

> Ok, thanks for trying to reproduce this. If possible, could you also
> activate trace-level logging for class 
> org.apache.flink.runtime.state.SharedStateRegistry?
> In case the problem occurs, this would greatly help to understand what was
> going on.
>
> > Am 21.11.2017 um 15:16 schrieb gerardg :
> >
> >> where exactly did you read many times that incremental checkpoints
> cannot
> > reference files from previous
> >> checkpoints, because we would have to correct that information. In fact,
> >> this is how incremental checkpoints work.
> >
> > My fault, I read it in some other posts in the mailing list but now that
> I
> > read it carefully it meant savepoints not checkpoints.
> >
> >> Now for this case, I would consider it extremely unlikely that a
> >> checkpoint 1620 would still reference a checkpoint 1,
> >> in particular if the files for that checkpoint are already deleted,
> which
> >> should only happen if it is no longer
> >> referenced. Which version of Flink are you using and what is your
> >> distributed filesystem? Is there any way to
> >> reproduce the problem?
> >
> > We are using Flink version 1.3.2 and GlusterFS.  There are usually a few
> > checkpoints around at the same time, for example right now:
> >
> > chk-1  chk-26  chk-27  chk-28  chk-29  chk-30  chk-31
> >
> > I'm not sure how to reproduce the problem but I'll monitor the folder to
> see
> > when chk-1 gets deleted and try to make the task fail when that happens.
> >
> > Gerard
> >
> > Gerard
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>