Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

2018-09-13 Thread Encho Mishinev
Hi Nico,

Unfortunately I can't share any of data, but it is not even data being
processed at the point of failure - it is still in the
matching-files-from-GCS phase.

I am using Apache Beam's FileIO to match files and during one of those
match-files steps I get the failure above.

Currently I run the job and when a taskmanager shows this error I reset it
and restart the job. That works fine since the failure occurs at the
beginning of the job only. It seems to be a problem within some
taskmanagers, which is very odd considering that I have them all generated
by a Kubernetes deployment, i.e. they should be completely identical.
Sometimes I have to restart 3-4 of them until I have a running cluster.

I will try setting the temporary directory to something other than the
default, can't hurt.

Thanks for the help,
Encho

On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber  wrote:

> Hi Encho,
> the SpillingAdaptiveSpanningRecordDeserializer that you see in your
> stack trace is executed while reading input records from another task.
> If the (serialized) records are too large (> 5MiB), it will write and
> assemble them in a spilling channel, i.e. on disk, instead of using
> memory. This will use the temporary directories specified via
> "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
> System.getProperty("java.io.tmpdir").
> -> These paths must actually be on an ordinary file system, not in gs://
> or so.
>
> The reason you only see this sporadically may be because not all your
> records are that big. It should, however, be deterministic in that it
> should always occur for the same record. Maybe something is wrong here
> and the record length is messed up, e.g. due to a bug in the
> de/serializer or the network stack.
>
> Do you actually have a minimal working example that you can share
> (either privately with me, or here) and shows this error?
>
>
> Nico
>
> On 29/08/18 14:19, Encho Mishinev wrote:
> > Hello,
> >
> > I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One
> > of my jobs involves reading from Google Cloud Storage which uses the
> > file scheme "gs://". Everything was fine but once in a while I would get
> > an exception that the scheme is not recognised. Now I've started seeing
> > them more often. It seems to be arbitrary - the exact same job with the
> > exact same parameters may finish successfully or throw this exception
> > and fail immediately. I can't figure out why it's not deterministic.
> > Here is the full exception logged upon the job failing:
> >
> > java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at Match files from GCS/Via
> MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error:
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >   at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> >   at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >   at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: No
> filesystem found for scheme gs
> >   at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> >   at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> >   at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> >   at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> >   ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >   at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> > Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme gs
> >   at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:459)
> >   at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:529)
> >   at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> >   at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
> >   at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
> >   at
> org.apache.beam.sdk.values.Timesta

Acknowledging Pubsub messages in Flink Runner

2018-09-10 Thread Encho Mishinev
Hello,

I have a simple question - when does the Flink Runner for Apache Beam
acknowledge Pubsub messages when using PubsubIO?

Thanks


History Server in Kubernetes

2018-08-30 Thread Encho Mishinev
Hello,

I am struggling to find how to run a history server in Kubernetes. The
docker image takes an argument that starts a jobmanager or a taskmanager,
but no history server. What's the best way to set up one in K8S?

Thanks,
Encho


Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

2018-08-29 Thread Encho Mishinev
Hello,

I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One of
my jobs involves reading from Google Cloud Storage which uses the file
scheme "gs://". Everything was fine but once in a while I would get an
exception that the scheme is not recognised. Now I've started seeing them
more often. It seems to be arbitrary - the exact same job with the exact
same parameters may finish successfully or throw this exception and fail
immediately. I can't figure out why it's not deterministic. Here is the
full exception logged upon the job failing:

java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at Match files from GCS/Via
MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
error: Error obtaining the sorted input: Thread 'SortMerger Reading
Thread' terminated due to an exception: No filesystem found for scheme
gs
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger Reading Thread' terminated due to an
exception: No filesystem found for scheme gs
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: No filesystem found for scheme gs
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
at 
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
at 
org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
at 
org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)


Any ideas why the behaviour is not deterministic regarding recognising
file system schemes?


Thanks,

Encho


Re: JobGraphs not cleaned up in HA mode

2018-08-29 Thread Encho Mishinev
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2018-08-29 11:49:20,292 INFO  org.apache.flink.runtime.jobmaster.JobMaster
- Close ResourceManager connection
827b94881bf7c94d8516907e04e3a564: JobManager is shutting down..
2018-08-29 11:49:20,292 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending
SlotPool.
2018-08-29 11:49:20,293 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping
SlotPool.
2018-08-29 11:49:20,293 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Disconnect job manager a3dab0a0883c5f0f37943358d9104d79
@akka.tcp://flink@flink-jobmanager-1:50010/user/jobmanager_0 for job
d69b67e4d28a2d244b06d3f6d661bca1 from the resource manager.
2018-08-29 11:49:20,293 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Stopping ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/d69b67e4d28a2d244b06d3f6d661bca1/job_manager_lock'}.
2018-08-29 11:49:20,304 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Removed job graph d69b67e4d28a2d244b06d3f6d661bca1 from ZooKeeper.


---

The result is:
HDFS has only a jobgraph and an empty default folder - everything else is
cleared
ZooKeeper has the jobgraph that Jobmanager-1 claims to have removed in the
last log still there.

On Wed, Aug 29, 2018 at 12:14 PM Till Rohrmann  wrote:

> Hi Encho,
>
> it sounds strange that the standby JobManager tries to recover a submitted
> job graph. This should only happen if it has been granted leadership. Thus,
> it seems as if the standby JobManager thinks that it is also the leader.
> Could you maybe share the logs of the two JobManagers/ClusterEntrypoints
> with us?
>
> Running only a single JobManager/ClusterEntrypoint in HA mode via a
> Kubernetes Deployment should do the trick and there is nothing wrong with
> it.
>
> Cheers,
> Till
>
> On Wed, Aug 29, 2018 at 11:05 AM Encho Mishinev 
> wrote:
>
>> Hello,
>>
>> Since two job managers don't seem to be working for me I was thinking of
>> just using a single job manager in Kubernetes in HA mode with a deployment
>> ensuring its restart whenever it fails. Is this approach viable? The
>> High-Availability page mentions that you use only one job manager in an
>> YARN cluster but does not specify such option for Kubernetes. Is there
>> anything that can go wrong with this approach?
>>
>> Thanks
>>
>> On Wed, Aug 29, 2018 at 11:10 AM Encho Mishinev 
>> wrote:
>>
>>> Hi,
>>>
>>> Unfortunately the thing I described does indeed happen every time. As
>>> mentioned in the first email, I am running on Kubernetes so certain things
>>> could be different compared to just a standalone cluster.
>>>
>>> Any ideas for workarounds are welcome, as this problem basically
>>> prevents me from using HA.
>>>
>>> Thanks,
>>> Encho
>>>
>>> On Wed, Aug 29, 2018 at 5:15 AM vino yang  wrote:
>>>
>>>> Hi Encho,
>>>>
>>>> From your description, I feel that there are extra bugs.
>>>>
>>>> About your description:
>>>>
>>>> *- Start both job managers*
>>>> *- Start a batch job in JobManager 1 and let it finish*
>>>> *The jobgraphs in both Zookeeper and HDFS remained.*
>>>>
>>>> Is it necessarily happening every time?
>>>>
>>>> In the Standalone cluster, the problems we encountered were sporadic.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Encho Mishinev  于2018年8月28日周二 下午8:07写道:
>>>>
>>>>> Hello Till,
>>>>>
>>>>> I spend a few more hours testing and looking at the logs and it seems
>>>>> like there's a more general problem here. While the two job managers are
>>>>> active neither of them can properly delete jobgraphs. The above problem I
>>>>> described comes from the fact that Kubernetes gets JobManager 1 quickly
>>>>> after I manually kill it, so when I stop the job on JobManager 2 both are
>>>>> alive.
>>>>>
>>>>> I did a very simple test:
>>>>>
>>>>> - Start both job managers
>>>>> - Start a batch job in JobManager 1 and let it finish
>>>>> The jobgraphs in both Zookeeper and HDFS remained.
>>>>>
>>>>> On the other hand if we do:
>>>>>
>>>>> - Start only JobManager 1 (again in HA mod

Re: JobGraphs not cleaned up in HA mode

2018-08-29 Thread Encho Mishinev
Hello,

Since two job managers don't seem to be working for me I was thinking of
just using a single job manager in Kubernetes in HA mode with a deployment
ensuring its restart whenever it fails. Is this approach viable? The
High-Availability page mentions that you use only one job manager in an
YARN cluster but does not specify such option for Kubernetes. Is there
anything that can go wrong with this approach?

Thanks

On Wed, Aug 29, 2018 at 11:10 AM Encho Mishinev 
wrote:

> Hi,
>
> Unfortunately the thing I described does indeed happen every time. As
> mentioned in the first email, I am running on Kubernetes so certain things
> could be different compared to just a standalone cluster.
>
> Any ideas for workarounds are welcome, as this problem basically prevents
> me from using HA.
>
> Thanks,
> Encho
>
> On Wed, Aug 29, 2018 at 5:15 AM vino yang  wrote:
>
>> Hi Encho,
>>
>> From your description, I feel that there are extra bugs.
>>
>> About your description:
>>
>> *- Start both job managers*
>> *- Start a batch job in JobManager 1 and let it finish*
>> *The jobgraphs in both Zookeeper and HDFS remained.*
>>
>> Is it necessarily happening every time?
>>
>> In the Standalone cluster, the problems we encountered were sporadic.
>>
>> Thanks, vino.
>>
>> Encho Mishinev  于2018年8月28日周二 下午8:07写道:
>>
>>> Hello Till,
>>>
>>> I spend a few more hours testing and looking at the logs and it seems
>>> like there's a more general problem here. While the two job managers are
>>> active neither of them can properly delete jobgraphs. The above problem I
>>> described comes from the fact that Kubernetes gets JobManager 1 quickly
>>> after I manually kill it, so when I stop the job on JobManager 2 both are
>>> alive.
>>>
>>> I did a very simple test:
>>>
>>> - Start both job managers
>>> - Start a batch job in JobManager 1 and let it finish
>>> The jobgraphs in both Zookeeper and HDFS remained.
>>>
>>> On the other hand if we do:
>>>
>>> - Start only JobManager 1 (again in HA mode)
>>> - Start a batch job and let it finish
>>> The jobgraphs in both Zookeeper and HDFS are deleted fine.
>>>
>>> It seems like the standby manager still leaves some kind of lock on the
>>> jobgraphs. Do you think that's possible? Have you seen a similar problem?
>>> The only logs that appear on the standby manager while waiting are of
>>> the type:
>>>
>>> 2018-08-28 11:54:10,789 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Recovered SubmittedJobGraph(9e0a109b57511930c95d3b54574a66e3, null).
>>>
>>> Note that this log appears on the standby jobmanager immediately when a
>>> new job is submitted to the active jobmanager.
>>> Also note that the blobs and checkpoints are cleared fine. The problem
>>> is only for jobgraphs both in ZooKeeper and HDFS.
>>>
>>> Trying to access the UI of the standby manager redirects to the active
>>> one, so it is not a problem of them not knowing who the leader is. Do you
>>> have any ideas?
>>>
>>> Thanks a lot,
>>> Encho
>>>
>>> On Tue, Aug 28, 2018 at 10:27 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Encho,
>>>>
>>>> thanks a lot for reporting this issue. The problem arises whenever the
>>>> old leader maintains the connection to ZooKeeper. If this is the case, then
>>>> ephemeral nodes which we create to protect against faulty delete operations
>>>> are not removed and consequently the new leader is not able to delete the
>>>> persisted job graph. So one thing to check is whether the old JM still has
>>>> an open connection to ZooKeeper. The next thing to check is the session
>>>> timeout of your ZooKeeper cluster. If you stop the job within the session
>>>> timeout, then it is also not guaranteed that ZooKeeper has detected that
>>>> the ephemeral nodes of the old JM must be deleted. In order to understand
>>>> this better it would be helpful if you could tell us the timing of the
>>>> different actions.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Aug 28, 2018 at 8:17 AM vino yang 
>>>> wrote:
>>>>
>>>>> Hi Encho,
>>>>>
>>>>> A temporary solution can be used to determine if it has been cleaned
>>>>> up by monitoring the specific 

Re: JobGraphs not cleaned up in HA mode

2018-08-29 Thread Encho Mishinev
Hi,

Unfortunately the thing I described does indeed happen every time. As
mentioned in the first email, I am running on Kubernetes so certain things
could be different compared to just a standalone cluster.

Any ideas for workarounds are welcome, as this problem basically prevents
me from using HA.

Thanks,
Encho

On Wed, Aug 29, 2018 at 5:15 AM vino yang  wrote:

> Hi Encho,
>
> From your description, I feel that there are extra bugs.
>
> About your description:
>
> *- Start both job managers*
> *- Start a batch job in JobManager 1 and let it finish*
> *The jobgraphs in both Zookeeper and HDFS remained.*
>
> Is it necessarily happening every time?
>
> In the Standalone cluster, the problems we encountered were sporadic.
>
> Thanks, vino.
>
> Encho Mishinev  于2018年8月28日周二 下午8:07写道:
>
>> Hello Till,
>>
>> I spend a few more hours testing and looking at the logs and it seems
>> like there's a more general problem here. While the two job managers are
>> active neither of them can properly delete jobgraphs. The above problem I
>> described comes from the fact that Kubernetes gets JobManager 1 quickly
>> after I manually kill it, so when I stop the job on JobManager 2 both are
>> alive.
>>
>> I did a very simple test:
>>
>> - Start both job managers
>> - Start a batch job in JobManager 1 and let it finish
>> The jobgraphs in both Zookeeper and HDFS remained.
>>
>> On the other hand if we do:
>>
>> - Start only JobManager 1 (again in HA mode)
>> - Start a batch job and let it finish
>> The jobgraphs in both Zookeeper and HDFS are deleted fine.
>>
>> It seems like the standby manager still leaves some kind of lock on the
>> jobgraphs. Do you think that's possible? Have you seen a similar problem?
>> The only logs that appear on the standby manager while waiting are of the
>> type:
>>
>> 2018-08-28 11:54:10,789 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Recovered SubmittedJobGraph(9e0a109b57511930c95d3b54574a66e3, null).
>>
>> Note that this log appears on the standby jobmanager immediately when a
>> new job is submitted to the active jobmanager.
>> Also note that the blobs and checkpoints are cleared fine. The problem is
>> only for jobgraphs both in ZooKeeper and HDFS.
>>
>> Trying to access the UI of the standby manager redirects to the active
>> one, so it is not a problem of them not knowing who the leader is. Do you
>> have any ideas?
>>
>> Thanks a lot,
>> Encho
>>
>> On Tue, Aug 28, 2018 at 10:27 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Encho,
>>>
>>> thanks a lot for reporting this issue. The problem arises whenever the
>>> old leader maintains the connection to ZooKeeper. If this is the case, then
>>> ephemeral nodes which we create to protect against faulty delete operations
>>> are not removed and consequently the new leader is not able to delete the
>>> persisted job graph. So one thing to check is whether the old JM still has
>>> an open connection to ZooKeeper. The next thing to check is the session
>>> timeout of your ZooKeeper cluster. If you stop the job within the session
>>> timeout, then it is also not guaranteed that ZooKeeper has detected that
>>> the ephemeral nodes of the old JM must be deleted. In order to understand
>>> this better it would be helpful if you could tell us the timing of the
>>> different actions.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 28, 2018 at 8:17 AM vino yang  wrote:
>>>
>>>> Hi Encho,
>>>>
>>>> A temporary solution can be used to determine if it has been cleaned up
>>>> by monitoring the specific JobID under Zookeeper's "/jobgraph".
>>>> Another solution, modify the source code, rudely modify the cleanup
>>>> mode to the synchronous form, but the flink operation Zookeeper's path
>>>> needs to obtain the corresponding lock, so it is dangerous to do so, and it
>>>> is not recommended.
>>>> I think maybe this problem can be solved in the next version. It
>>>> depends on Till.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Encho Mishinev  于2018年8月28日周二 下午1:17写道:
>>>>
>>>>> Thank you very much for the info! Will keep track of the progress.
>>>>>
>>>>> In the meantime is there any viable workaround? It seems like HA
>>>>> doesn't really work due to this bug.
>>>>>
>>>&

Re: JobGraphs not cleaned up in HA mode

2018-08-28 Thread Encho Mishinev
Hello Till,

I spend a few more hours testing and looking at the logs and it seems like
there's a more general problem here. While the two job managers are active
neither of them can properly delete jobgraphs. The above problem I
described comes from the fact that Kubernetes gets JobManager 1 quickly
after I manually kill it, so when I stop the job on JobManager 2 both are
alive.

I did a very simple test:

- Start both job managers
- Start a batch job in JobManager 1 and let it finish
The jobgraphs in both Zookeeper and HDFS remained.

On the other hand if we do:

- Start only JobManager 1 (again in HA mode)
- Start a batch job and let it finish
The jobgraphs in both Zookeeper and HDFS are deleted fine.

It seems like the standby manager still leaves some kind of lock on the
jobgraphs. Do you think that's possible? Have you seen a similar problem?
The only logs that appear on the standby manager while waiting are of the
type:

2018-08-28 11:54:10,789 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Recovered SubmittedJobGraph(9e0a109b57511930c95d3b54574a66e3, null).

Note that this log appears on the standby jobmanager immediately when a new
job is submitted to the active jobmanager.
Also note that the blobs and checkpoints are cleared fine. The problem is
only for jobgraphs both in ZooKeeper and HDFS.

Trying to access the UI of the standby manager redirects to the active one,
so it is not a problem of them not knowing who the leader is. Do you have
any ideas?

Thanks a lot,
Encho

On Tue, Aug 28, 2018 at 10:27 AM Till Rohrmann  wrote:

> Hi Encho,
>
> thanks a lot for reporting this issue. The problem arises whenever the old
> leader maintains the connection to ZooKeeper. If this is the case, then
> ephemeral nodes which we create to protect against faulty delete operations
> are not removed and consequently the new leader is not able to delete the
> persisted job graph. So one thing to check is whether the old JM still has
> an open connection to ZooKeeper. The next thing to check is the session
> timeout of your ZooKeeper cluster. If you stop the job within the session
> timeout, then it is also not guaranteed that ZooKeeper has detected that
> the ephemeral nodes of the old JM must be deleted. In order to understand
> this better it would be helpful if you could tell us the timing of the
> different actions.
>
> Cheers,
> Till
>
> On Tue, Aug 28, 2018 at 8:17 AM vino yang  wrote:
>
>> Hi Encho,
>>
>> A temporary solution can be used to determine if it has been cleaned up
>> by monitoring the specific JobID under Zookeeper's "/jobgraph".
>> Another solution, modify the source code, rudely modify the cleanup mode
>> to the synchronous form, but the flink operation Zookeeper's path needs to
>> obtain the corresponding lock, so it is dangerous to do so, and it is not
>> recommended.
>> I think maybe this problem can be solved in the next version. It depends
>> on Till.
>>
>> Thanks, vino.
>>
>> Encho Mishinev  于2018年8月28日周二 下午1:17写道:
>>
>>> Thank you very much for the info! Will keep track of the progress.
>>>
>>> In the meantime is there any viable workaround? It seems like HA doesn't
>>> really work due to this bug.
>>>
>>> On Tue, Aug 28, 2018 at 4:52 AM vino yang  wrote:
>>>
>>>> About some implementation mechanisms.
>>>> Flink uses Zookeeper to store JobGraph (Job's description information
>>>> and metadata) as a basis for Job recovery.
>>>> However, previous implementations may cause this information to not be
>>>> properly cleaned up because it is asynchronously deleted by a background
>>>> thread.
>>>>
>>>> Thanks, vino.
>>>>
>>>> vino yang  于2018年8月28日周二 上午9:49写道:
>>>>
>>>>> Hi Encho,
>>>>>
>>>>> This is a problem already known to the Flink community, you can track
>>>>> its progress through FLINK-10011[1], and currently Till is fixing this
>>>>> issue.
>>>>>
>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> Encho Mishinev  于2018年8月27日周一 下午10:13写道:
>>>>>
>>>>>> I am running Flink 1.5.3 with two job managers and two task managers
>>>>>> in Kubernetes along with HDFS and Zookeeper in high-availability mode.
>>>>>>
>>>>>> My problem occurs after the following actions:
>>>>>> - Upload a .jar file to jobmanager-1
>>>>>> - Run a streaming job from the jar on jobmanager-1
>>>>&g

Re: JobGraphs not cleaned up in HA mode

2018-08-27 Thread Encho Mishinev
Thank you very much for the info! Will keep track of the progress.

In the meantime is there any viable workaround? It seems like HA doesn't
really work due to this bug.

On Tue, Aug 28, 2018 at 4:52 AM vino yang  wrote:

> About some implementation mechanisms.
> Flink uses Zookeeper to store JobGraph (Job's description information and
> metadata) as a basis for Job recovery.
> However, previous implementations may cause this information to not be
> properly cleaned up because it is asynchronously deleted by a background
> thread.
>
> Thanks, vino.
>
> vino yang  于2018年8月28日周二 上午9:49写道:
>
>> Hi Encho,
>>
>> This is a problem already known to the Flink community, you can track its
>> progress through FLINK-10011[1], and currently Till is fixing this issue.
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>>
>> Thanks, vino.
>>
>> Encho Mishinev  于2018年8月27日周一 下午10:13写道:
>>
>>> I am running Flink 1.5.3 with two job managers and two task managers in
>>> Kubernetes along with HDFS and Zookeeper in high-availability mode.
>>>
>>> My problem occurs after the following actions:
>>> - Upload a .jar file to jobmanager-1
>>> - Run a streaming job from the jar on jobmanager-1
>>> - Wait for 1 or 2 checkpoints to succeed
>>> - Kill pod of jobmanager-1
>>> After a short delay, jobmanager-2 takes leadership and correctly
>>> restores the job and continues it
>>> - Stop job from jobmanager-2
>>>
>>> At this point all seems well, but the problem is that jobmanager-2 does
>>> not clean up anything that was left from jobmanager-1. This means that both
>>> in HDFS and in Zookeeper remain job graphs, which later on obstruct any
>>> work of both managers as after any reset they unsuccessfully try to restore
>>> a non-existent job and fail over and over again.
>>>
>>> I am quite certain that jobmanager-2 does not know about any of
>>> jobmanager-1’s files since the Zookeeper logs reveal that it tries to
>>> duplicate job folders:
>>>
>>> 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46
>>> zxid:0x1ab txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>> Error:KeeperErrorCode = NodeExists for
>>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>>
>>> 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c
>>> zxid:0x1ac txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>> Error:KeeperErrorCode = NodeExists for
>>> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>>
>>> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
>>> when the job is stopped, but fails since there are leftover files in it
>>> from jobmanager-1:
>>>
>>> 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8
>>> zxid:0x1bd txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>> Error:KeeperErrorCode = Directory not empty for
>>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>>
>>> I’ve noticed that when restoring the job, it seems like jobmanager-2
>>> does not get anything more than jobID, while it perhaps needs some
>>> metadata? Here is the log that seems suspicious to me:
>>>
>>> 2018-08-27 13:09:18,113 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).
>>>
>>> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware
>>> that it’s overwriting anything or not deleting properly.
>>>
>>> My question is - what is the intended way for the job managers to
>>> correctly exchange metadata in HA mode and why is it not working for me?
>>>
>>> Thanks in advance!
>>
>>


JobGraphs not cleaned up in HA mode

2018-08-27 Thread Encho Mishinev
I am running Flink 1.5.3 with two job managers and two task managers in 
Kubernetes along with HDFS and Zookeeper in high-availability mode.

My problem occurs after the following actions:
- Upload a .jar file to jobmanager-1
- Run a streaming job from the jar on jobmanager-1
- Wait for 1 or 2 checkpoints to succeed
- Kill pod of jobmanager-1
After a short delay, jobmanager-2 takes leadership and correctly restores the 
job and continues it
- Stop job from jobmanager-2

At this point all seems well, but the problem is that jobmanager-2 does not 
clean up anything that was left from jobmanager-1. This means that both in HDFS 
and in Zookeeper remain job graphs, which later on obstruct any work of both 
managers as after any reset they unsuccessfully try to restore a non-existent 
job and fail over and over again.

I am quite certain that jobmanager-2 does not know about any of jobmanager-1’s 
files since the Zookeeper logs reveal that it tries to duplicate job folders:

2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x46 zxid:0x1ab 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
 Error:KeeperErrorCode = NodeExists for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77

2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x5c zxid:0x1ac 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = NodeExists for 
/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15

Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper when the 
job is stopped, but fails since there are leftover files in it from 
jobmanager-1:

2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 zxid:0x1bd 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = Directory not empty for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15

I’ve noticed that when restoring the job, it seems like jobmanager-2 does not 
get anything more than jobID, while it perhaps needs some metadata? Here is the 
log that seems suspicious to me:

2018-08-27 13:09:18,113 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).

All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that it’s 
overwriting anything or not deleting properly.

My question is - what is the intended way for the job managers to correctly 
exchange metadata in HA mode and why is it not working for me?

Thanks in advance!