Re: Setting flink-conf params in IDE

2019-01-16 Thread Till Rohrmann
Hi Alexandru,

you can call `StreamExecutionEnvironment#createLocalEnvironment` which you
can pass a Flink configuration object.

Cheers,
Till

On Wed, Jan 16, 2019 at 1:05 PM Alexandru Gutan 
wrote:

> Hi everyone!
>
> Is there a way to set flink-conf.yaml params but when running from the
> IDE?
>
> What I'm trying to do is to setup JMX metrics:
>
> metrics.reporter.jmx.class: 
> org.apache.flink.metrics.jmx.JMXReportermetrics.reporter.jmx.port: 8789
>
> Thanks!
>
>


Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Till Rohrmann
I haven't configured this myself but I would guess that you need to set the
parameters defined here under S3A Authentication methods [1]. If the
environment variables don't work, then I would try to set the
authentication properties.

[1]
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A

Cheers,
Till

On Wed, Jan 16, 2019 at 11:09 AM Vinay Patil 
wrote:

> Hi Till,
>
> Can you please let us know the configurations that we need to set for
> Profile based credential provider in flink-conf.yaml
>
> Exporting AWS_PROFILE property on EMR did not work.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jan 16, 2019 at 3:05 PM Till Rohrmann 
> wrote:
>
>> The old BucketingSink was using Hadoop's S3 filesystem directly whereas
>> the new StreamingFileSink uses Flink's own FileSystem which need to be
>> configured via the flink-conf.yaml.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 16, 2019 at 10:31 AM Vinay Patil 
>> wrote:
>>
>>> Hi Till,
>>>
>>> We are not providing `fs.s3a.access.key: access_key`,
>>> `fs.s3a.secret.key: secret_key` in flink-conf.yaml as we are using Profile
>>> based credentials provider. The older BucketingSink code is able to get the
>>> credentials and write to S3. We are facing this issue only with
>>> StreamingFileSink. We tried adding fs.s3a.impl to core-site.xml when the
>>> default configurations were not working.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> Flink's file systems are self contained and won't respect the
>>>> core-site.xml if I'm not mistaken. Instead you have to set the credentials
>>>> in the flink configuration flink-conf.yaml via `fs.s3a.access.key:
>>>> access_key`, `fs.s3a.secret.key: secret_key` and so on [1]. Have you tried
>>>> this out?
>>>>
>>>> This has been fixed with Flink 1.6.2 and 1.7.0 [2].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
>>>> [2] https://issues.apache.org/jira/browse/FLINK-10383
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas <
>>>> k.klou...@da-platform.com> wrote:
>>>>
>>>>> Hi Taher,
>>>>>
>>>>> So you are using the same configuration files and everything and the
>>>>> only thing you change is the "s3://" to "s3a://" and the sink cannot find
>>>>> the credentials?
>>>>> Could you please provide the logs of the Task Managers?
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz <
>>>>> dwysakow...@apache.org> wrote:
>>>>>
>>>>>> Forgot to cc ;)
>>>>>> On 16/01/2019 08:51, Vinay Patil wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Can someone please help on this issue. We have even tried to set
>>>>>> fs.s3a.impl in core-site.xml, still its not working.
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink
>>>>>> User Mailing List archive.] 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>  We have implemented S3 sink in the following way:
>>>>>>>
>>>>>>> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
>>>>>>> Path("s3a://mybucket/myfolder/output/"),
>>>>>>> ParquetAvroWriters.forGenericRecord(schema))
>>>>>>> .withBucketCheckInterval(50l).withBucketAssigner(new
>>>>>>> CustomBucketAssigner()).build();
>>>>>>>
>>>>>>> The problem we are facing is that StreamingFileSink is initializing
>>>>>>> S3AFileSystem class to write to s3 and is not able to find the s3
>>>>>>> credentials to write data, However other flink application on the
>>>>>>> same cluster use "s3://" paths are able to write data to the same s3 
>>>>>>> bucket
>>>>>>> and folders, we are only facing this issue with StreamingFileSink.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Taher Koitawala
>>>>>>> GS Lab Pune
>>>>>>> +91 8407979163
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>>>>> .
>>>>>>> NAML
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>


Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Till Rohrmann
The old BucketingSink was using Hadoop's S3 filesystem directly whereas the
new StreamingFileSink uses Flink's own FileSystem which need to be
configured via the flink-conf.yaml.

Cheers,
Till

On Wed, Jan 16, 2019 at 10:31 AM Vinay Patil 
wrote:

> Hi Till,
>
> We are not providing `fs.s3a.access.key: access_key`, `fs.s3a.secret.key:
> secret_key` in flink-conf.yaml as we are using Profile based credentials
> provider. The older BucketingSink code is able to get the credentials and
> write to S3. We are facing this issue only with StreamingFileSink. We tried
> adding fs.s3a.impl to core-site.xml when the default configurations were
> not working.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann 
> wrote:
>
>> Hi Vinay,
>>
>> Flink's file systems are self contained and won't respect the
>> core-site.xml if I'm not mistaken. Instead you have to set the credentials
>> in the flink configuration flink-conf.yaml via `fs.s3a.access.key:
>> access_key`, `fs.s3a.secret.key: secret_key` and so on [1]. Have you tried
>> this out?
>>
>> This has been fixed with Flink 1.6.2 and 1.7.0 [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
>> [2] https://issues.apache.org/jira/browse/FLINK-10383
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas <
>> k.klou...@da-platform.com> wrote:
>>
>>> Hi Taher,
>>>
>>> So you are using the same configuration files and everything and the
>>> only thing you change is the "s3://" to "s3a://" and the sink cannot find
>>> the credentials?
>>> Could you please provide the logs of the Task Managers?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz 
>>> wrote:
>>>
>>>> Forgot to cc ;)
>>>> On 16/01/2019 08:51, Vinay Patil wrote:
>>>>
>>>> Hi,
>>>>
>>>> Can someone please help on this issue. We have even tried to set
>>>> fs.s3a.impl in core-site.xml, still its not working.
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>>
>>>> On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
>>>> Mailing List archive.]  wrote:
>>>>
>>>>> Hi All,
>>>>>  We have implemented S3 sink in the following way:
>>>>>
>>>>> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
>>>>> Path("s3a://mybucket/myfolder/output/"),
>>>>> ParquetAvroWriters.forGenericRecord(schema))
>>>>> .withBucketCheckInterval(50l).withBucketAssigner(new
>>>>> CustomBucketAssigner()).build();
>>>>>
>>>>> The problem we are facing is that StreamingFileSink is initializing
>>>>> S3AFileSystem class to write to s3 and is not able to find the s3
>>>>> credentials to write data, However other flink application on the
>>>>> same cluster use "s3://" paths are able to write data to the same s3 
>>>>> bucket
>>>>> and folders, we are only facing this issue with StreamingFileSink.
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>> GS Lab Pune
>>>>> +91 8407979163
>>>>>
>>>>>
>>>>> --
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>>> .
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>


Re: Streaming Checkpoint - Could not materialize checkpoint Exception

2019-01-16 Thread Till Rohrmann
Hi Sohimankotia,

you can control Flink's failure behaviour in case of a checkpoint failure
via the `ExecutionConfig#setFailTaskOnCheckpointError(boolean)`. Per
default it is set to true which means that a Flink task will fail if a
checkpoint error occurs. If you set it to false, then the job won't fail if
a checkpoint fails.

Cheers,
Till

On Wed, Jan 16, 2019 at 3:20 AM Congxian Qiu  wrote:

> Hi, Sohi
> You can check out doc[1][2] to find out the answer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html
>
> sohimankotia  于2019年1月15日周二 下午4:16写道:
>
>> Yes. File got deleted .
>>
>> 2019-01-15 10:40:41,360 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
>> (auth:SIMPLE)  ip=/192.168.3.184   cmd=delete
>> src=/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
>>
>> dst=nullperm=null   proto=rpc
>>
>> Looks like file was deleted from job itself .
>>
>> Does it cause job restart then ?
>>
>> If checkpoint fails then it should try next checkpoint or restart job ?
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Best,
> Congxian
>


Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Till Rohrmann
Hi Vinay,

Flink's file systems are self contained and won't respect the core-site.xml
if I'm not mistaken. Instead you have to set the credentials in the flink
configuration flink-conf.yaml via `fs.s3a.access.key: access_key`,
`fs.s3a.secret.key: secret_key` and so on [1]. Have you tried this out?

This has been fixed with Flink 1.6.2 and 1.7.0 [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
[2] https://issues.apache.org/jira/browse/FLINK-10383

Cheers,
Till

On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas 
wrote:

> Hi Taher,
>
> So you are using the same configuration files and everything and the only
> thing you change is the "s3://" to "s3a://" and the sink cannot find the
> credentials?
> Could you please provide the logs of the Task Managers?
>
> Cheers,
> Kostas
>
> On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz 
> wrote:
>
>> Forgot to cc ;)
>> On 16/01/2019 08:51, Vinay Patil wrote:
>>
>> Hi,
>>
>> Can someone please help on this issue. We have even tried to set
>> fs.s3a.impl in core-site.xml, still its not working.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
>> Mailing List archive.]  wrote:
>>
>>> Hi All,
>>>  We have implemented S3 sink in the following way:
>>>
>>> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
>>> Path("s3a://mybucket/myfolder/output/"),
>>> ParquetAvroWriters.forGenericRecord(schema))
>>> .withBucketCheckInterval(50l).withBucketAssigner(new
>>> CustomBucketAssigner()).build();
>>>
>>> The problem we are facing is that StreamingFileSink is initializing
>>> S3AFileSystem class to write to s3 and is not able to find the s3
>>> credentials to write data, However other flink application on the same
>>> cluster use "s3://" paths are able to write data to the same s3 bucket and
>>> folders, we are only facing this issue with StreamingFileSink.
>>>
>>> Regards,
>>> Taher Koitawala
>>> GS Lab Pune
>>> +91 8407979163
>>>
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email ml+s2336050n1...@n4.nabble.com
>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>> 
>>> .
>>> NAML
>>> 
>>>
>>


Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
I'm not aware of someone working on this feature right now.

On Tue, Jan 15, 2019 at 3:22 PM Alexandru Gutan 
wrote:

> Thats great news!
>
> Are there any plans to expose it in the upcoming Flink release?
>
> On Tue, 15 Jan 2019 at 12:59, Till Rohrmann  wrote:
>
>> Hi Alexandru,
>>
>> at the moment `/jobs/:jobid/rescaling` will always change the parallelism
>> for all operators. The maximum is the maximum parallelism which you have
>> defined for an operator.
>>
>> I agree that it should also be possible to rescale an individual
>> operator. There internal functionality is already implemented (see
>> JobMaster#rescaleOperators) but has not been exposed.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan 
>> wrote:
>>
>>> Thanks Till!
>>>
>>> To execute the above (using Kubernetes), one would enter the running
>>> JobManager service and execute it?
>>> The following REST API call does the same */jobs/:jobid/rescaling*?
>>>
>>> I assume it changes the base parallelism, but what it will do if I had
>>> already set the parallelism of my operators?
>>> e.g.
>>> .source(..)
>>> .setParallelism(3)
>>> .setUID(..)
>>> .map(..)
>>> .setParallelism(8)
>>> .setUID(..)
>>> .sink(..)
>>> .setParallelism(3)
>>> .setUID(..)
>>>
>>> I think it would be a good idea to have */jobs/:jobid/rescaling,* 
>>> additionally
>>> requiring the *operatorUID* as a queryParameter*, *so that the
>>> parallelism of specific operators could be changed.
>>>
>>> Best,
>>> Alex.
>>>
>>> On Tue, 15 Jan 2019 at 10:27, Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Alexandru,
>>>>
>>>> you can use the `modify` command `bin/flink modify 
>>>> --parallelism ` to modify the parallelism of a job. At the
>>>> moment, it is implemented as first taking a savepoint, stopping the job and
>>>> then redeploying the job with the changed parallelism and resuming from the
>>>> savepoint.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> Hi Alexandru
>>>>>
>>>>> As for 2, generally speaking the number of required slots depends on
>>>>> number of slot sharing groups. By default all operators belong to the
>>>>> default slot sharing group, that means a job requires as many slots as
>>>>> maximal parallelism in the job. More on the distributed runtime you can
>>>>> read here[1]
>>>>>
>>>>> As for 1 I cc'ed Gary and Till who might better answer your question.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>> On 14/01/2019 15:26, Alexandru Gutan wrote:
>>>>>
>>>>> Hi everyone!
>>>>>
>>>>> 1. Is there a way to increase the parallelism (e.g. through REST) of
>>>>> some operators in a job without re-deploying the job? I found this
>>>>> <https://stackoverflow.com/questions/50719147/apache-flink-guideliness-for-setting-parallelism>
>>>>> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
>>>>> How? Support for Kubernetes?
>>>>> 2. What happens when the number of parallel operator instances exceeds
>>>>> the number of task slots? For example: a job with a source (parallelism 
>>>>> 3),
>>>>> a map (parallelism 8), a sink (parallelism 3), total of *14* operator
>>>>> instances and a setup with *8* task slots. Will the operators get
>>>>> chained? What if I disable operator chaining?
>>>>>
>>>>> Thank you!
>>>>>
>>>>>


Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Till Rohrmann
Hi Pasquale,

if you configured a checkpoint directory, then the MemoryStateBackend will
also write the checkpoint data to disk in order to persist it.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:08 PM Pasquale Vazzana  wrote:

> I can send you some debug logs and the execution plan, can I use your
> personal email? There might be sensitive info in the logs.
>
>
>
> Incoming and Outgoing records are fairly distributed across subtasks, with
> similar but alternate loads, when the checkpoint is triggered, the load
> drops to nearly zero, all the fetch requests sent to kafka (2.0.1) time out
> and often the clients disconnect from the brokers.
>
> Both source topics are 30 partitions each, they get keyed, connected and
> co-processed.
>
> I am checkpointing with EOS, as I said I’ve tried all the backend with
> either DELETE_ON_CANCELLATION or RETAIN_ON_CANCELLATION. I assume that
> using the MemoryStateBackend and CANCELLATION should remove any possibility
> of disk/IO congestions, am I wrong?.
>
>
>
> Pasquale
>
>
>
> *From:* Till Rohrmann 
> *Sent:* 15 January 2019 10:33
> *To:* Pasquale Vazzana 
> *Cc:* Bruno Aranda ; user 
> *Subject:* Re: Subtask much slower than the others when creating
> checkpoints
>
>
>
> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
> would be that the respective tasks are overloaded or there is some resource
> congestion (network, disk, etc).
>
>
>
> You should see in the web UI the number of incoming and outgoing events.
> It would be good to check that the events are similarly sized and can be
> computed in roughly the same time.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
> wrote:
>
> I have the same problem, even more impactful. Some subtasks stall forever
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and
> RocksDB back ends but nothing changes. I've also tried to change the
> medium, local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1
> everything works but if I increase the number of parallelism then
> everything degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few
> milliseconds but there is always at least one which stalls for minutes
> until it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is
> not much data skew and the keyBy uses the same key strategy as the Kafka
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might
> take a while so any help will be appreciated.
>
> Pasquale
>
>
> From: Till Rohrmann 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda 
> Cc: user 
> Subject: Re: Subtask much slower than the others when creating checkpoints
>
> Hi Bruno,
>
> there are multiple reasons wh= one of the subtasks can take longer for
> checkpointing. It looks as if the=e is not much data skew since the state
> sizes are relatively equal. It als= looks as if the individual tasks all
> start at the same time with the chec=pointing which indicates that there
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
> back-pressured). This narrows the pro=lem cause down to the asynchronous
> write operation. One potential problem =ould be if the external system to
> which you write your checkpoint data has=some kind of I/O limit/quota.
> Maybe the sum of write accesses deplete the =aximum quota you have. You
> could try whether running the job with a lower =arallelism solves the
> problems.
>
> For further debug=ing it could be helpful to get access to the logs of the
> JobManager and th= TaskManagers on DEBUG log level. It could also be
> helpful to learn which =tate backend you are using.
>
> Cheers,
> Til=
>
> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda <mailto:bara...@apache.org>
> wrote:
> Hi,
>
> We are using Flink =.6.1 at the moment and we have a streaming job
> configured to create a chec=point every 10 seconds. Looking at the
> checkpointing times in the UI, we c=n see that one subtask is much slower
> creating the endpoint, at least in i=s "End to End Duration", and seems
> caused by a longer "Chec=point Duration (Async)".
>
> For instance, in th= attach screenshot, while most of the subtasks take
&

Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
Hi Alexandru,

at the moment `/jobs/:jobid/rescaling` will always change the parallelism
for all operators. The maximum is the maximum parallelism which you have
defined for an operator.

I agree that it should also be possible to rescale an individual operator.
There internal functionality is already implemented (see
JobMaster#rescaleOperators) but has not been exposed.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan 
wrote:

> Thanks Till!
>
> To execute the above (using Kubernetes), one would enter the running
> JobManager service and execute it?
> The following REST API call does the same */jobs/:jobid/rescaling*?
>
> I assume it changes the base parallelism, but what it will do if I had
> already set the parallelism of my operators?
> e.g.
> .source(..)
> .setParallelism(3)
> .setUID(..)
> .map(..)
> .setParallelism(8)
> .setUID(..)
> .sink(..)
> .setParallelism(3)
> .setUID(..)
>
> I think it would be a good idea to have */jobs/:jobid/rescaling,* additionally
> requiring the *operatorUID* as a queryParameter*, *so that the
> parallelism of specific operators could be changed.
>
> Best,
> Alex.
>
> On Tue, 15 Jan 2019 at 10:27, Till Rohrmann  wrote:
>
>> Hi Alexandru,
>>
>> you can use the `modify` command `bin/flink modify  --parallelism
>> ` to modify the parallelism of a job. At the moment, it is
>> implemented as first taking a savepoint, stopping the job and then
>> redeploying the job with the changed parallelism and resuming from the
>> savepoint.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Alexandru
>>>
>>> As for 2, generally speaking the number of required slots depends on
>>> number of slot sharing groups. By default all operators belong to the
>>> default slot sharing group, that means a job requires as many slots as
>>> maximal parallelism in the job. More on the distributed runtime you can
>>> read here[1]
>>>
>>> As for 1 I cc'ed Gary and Till who might better answer your question.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>>>
>>> Best,
>>>
>>> Dawid
>>> On 14/01/2019 15:26, Alexandru Gutan wrote:
>>>
>>> Hi everyone!
>>>
>>> 1. Is there a way to increase the parallelism (e.g. through REST) of
>>> some operators in a job without re-deploying the job? I found this
>>> <https://stackoverflow.com/questions/50719147/apache-flink-guideliness-for-setting-parallelism>
>>> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
>>> How? Support for Kubernetes?
>>> 2. What happens when the number of parallel operator instances exceeds
>>> the number of task slots? For example: a job with a source (parallelism 3),
>>> a map (parallelism 8), a sink (parallelism 3), total of *14* operator
>>> instances and a setup with *8* task slots. Will the operators get
>>> chained? What if I disable operator chaining?
>>>
>>> Thank you!
>>>
>>>


Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Till Rohrmann
Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
would be that the respective tasks are overloaded or there is some resource
congestion (network, disk, etc).

You should see in the web UI the number of incoming and outgoing events. It
would be good to check that the events are similarly sized and can be
computed in roughly the same time.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana  wrote:

> I have the same problem, even more impactful. Some subtasks stall forever
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and
> RocksDB back ends but nothing changes. I've also tried to change the
> medium, local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1
> everything works but if I increase the number of parallelism then
> everything degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few
> milliseconds but there is always at least one which stalls for minutes
> until it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is
> not much data skew and the keyBy uses the same key strategy as the Kafka
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might
> take a while so any help will be appreciated.
>
> Pasquale
>
>
> From: Till Rohrmann 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda 
> Cc: user 
> Subject: Re: Subtask much slower than the others when creating checkpoints
>
> Hi Bruno,
>
> there are multiple reasons wh= one of the subtasks can take longer for
> checkpointing. It looks as if the=e is not much data skew since the state
> sizes are relatively equal. It als= looks as if the individual tasks all
> start at the same time with the chec=pointing which indicates that there
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
> back-pressured). This narrows the pro=lem cause down to the asynchronous
> write operation. One potential problem =ould be if the external system to
> which you write your checkpoint data has=some kind of I/O limit/quota.
> Maybe the sum of write accesses deplete the =aximum quota you have. You
> could try whether running the job with a lower =arallelism solves the
> problems.
>
> For further debug=ing it could be helpful to get access to the logs of the
> JobManager and th= TaskManagers on DEBUG log level. It could also be
> helpful to learn which =tate backend you are using.
>
> Cheers,
> Til=
>
> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda <mailto:bara...@apache.org>
> wrote:
> Hi,
>
> We are using Flink =.6.1 at the moment and we have a streaming job
> configured to create a chec=point every 10 seconds. Looking at the
> checkpointing times in the UI, we c=n see that one subtask is much slower
> creating the endpoint, at least in i=s "End to End Duration", and seems
> caused by a longer "Chec=point Duration (Async)".
>
> For instance, in th= attach screenshot, while most of the subtasks take
> half a second, one (an= it is always one) takes 2 seconds.
>
> But we have w=rse problems. We have seen cases where the checkpoint times
> out for one ta=ks, while most take one second, the outlier takes more than
> 5 minutes (whi=h is the max time we allow for a checkpoint). This can
> happen if there is =ack pressure. We only allow one checkpoint at a time as
> well.
> Why could one subtask take more time? This jobs read from kafk= partitions
> and hash by key, and we don't see any major data skew betw=en the
> partitions. Does one partition do more work?
>
> We do have a cluster of 20 machines, in EMR, with TMs that have
> multiple=slots (in legacy mode).
>
> Is this something that co=ld have been fixed in a more recent version?
>
> Than=s for any insight!
>
> Bruno
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential.
> Please refer to http://www.mwam.com/email-disclaimer-uk for important
> disclosures regarding this email. If we collect and use your personal data
> we will use it in accordance with our privacy policy, which can be reviewed
> at https://www.mwam.com/privacy-policy .
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability p

Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
Hi Alexandru,

you can use the `modify` command `bin/flink modify  --parallelism
` to modify the parallelism of a job. At the moment, it is
implemented as first taking a savepoint, stopping the job and then
redeploying the job with the changed parallelism and resuming from the
savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
wrote:

> Hi Alexandru
>
> As for 2, generally speaking the number of required slots depends on
> number of slot sharing groups. By default all operators belong to the
> default slot sharing group, that means a job requires as many slots as
> maximal parallelism in the job. More on the distributed runtime you can
> read here[1]
>
> As for 1 I cc'ed Gary and Till who might better answer your question.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>
> Best,
>
> Dawid
> On 14/01/2019 15:26, Alexandru Gutan wrote:
>
> Hi everyone!
>
> 1. Is there a way to increase the parallelism (e.g. through REST) of some
> operators in a job without re-deploying the job? I found this
> 
> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
> How? Support for Kubernetes?
> 2. What happens when the number of parallel operator instances exceeds the
> number of task slots? For example: a job with a source (parallelism 3), a
> map (parallelism 8), a sink (parallelism 3), total of *14* operator
> instances and a setup with *8* task slots. Will the operators get
> chained? What if I disable operator chaining?
>
> Thank you!
>
>


Re: Recovery problem 2 of 2 in Flink 1.6.3

2019-01-15 Thread Till Rohrmann
Hi John,

this looks indeed strange. How many concurrent operators do you have which
write state to s3?

After the cancellation, the JobManager should keep the slots for some time
until they are freed. This is the normal behaviour and can be controlled
with `slot.idle.timeout`. Could you maybe share the complete logs on DEBUG
log level to fully understand the problem? A thread dump of the TM process
would also be helpful to see whether there are any blocking operations
which keep the HTTP connections open.

Cheers,
Till

On Thu, Jan 10, 2019 at 9:35 PM John Stone  wrote:

> This is the second of two recovery problems I'm seeing running Flink in
> Kubernetes.  I'm posting them in separate messages for brevity and because
> the second is not directly related to the first.  Any advice is
> appreciated.  First problem:
> https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E
>
>
>
> Setup:
>
> Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One
> JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each
> TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for
> storage.  RocksDB with incremental snapshots.  It doesn't matter if local
> recover is enabled - I've managed to replicate with both local recovery
> enabled and disabled.  The value of "fs.s3a.connection.maximum" is 128.
>
>
>
> Problem:
>
> Flink + Hadoop does not either re-use existing connections to S3 or kill
> existing connections and create new ones when a job dies.
>
>
>
> Replication Steps:
>
> Create a job with a parallelism of 16 - all processing is occurring on
> TM_1.  After a checkpoint has been taken, delete TM_1.  Job is canceled on
> TM_1, deployed and restored sucessfully on TM_2, and a new TaskManager
> (TM_3) is created and successfully registers with the JobManager.  No work
> is scheduled on TM_3.  After another checkpoint is taken, delete TM_2.  The
> job is canceled on TM_2, and attempts to be deployed TM_3 but fails with
> "org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool".  Flink attempts to recover by
> canceling on TM_3 and deploying on TM_4, but Flink does not does not
> release the task slots on TM_3 (TM_3 now has no free task slots).  The job
> is deployed to TM_4 which again fails with "ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool".  Flink attempts to recover by
> canceling on TM_4, but does not release the task slots on TM_4 (TM_4 now
> has no free task slots).  As there are 0 available slots, the job is now
> caught in a SCHEDULED state.
>
>
>
> Actual Behavior:
>
> Shaded Hadoop does not release hold on S3 connections when job dies.
>
>
>
> Expected Behavior:
>
> Hadoop should be told to release connections when job dies, or should
> re-use existing connections.
>
>
>
> Log Snip:
>
> 2019-01-10 20:03:40,191 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to
> FAILED.
>
> java.io.InterruptedIOException: getFileStatus on
> s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd:
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
>
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
>
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>
> at 

Re: Recovery problem 1 of 2 in Flink 1.6.3

2019-01-15 Thread Till Rohrmann
Hi John,

this is definitely not how Flink should behave in this situation and could
indicate a bug. From the logs I couldn't figure out the problem. Would it
be possible to obtain for the TMs and JM the full logs with DEBUG log
level? This would help me to further debug the problem.

Cheers,
Till

On Mon, Jan 14, 2019 at 5:04 PM John Stone  wrote:

> Is this a known issue?  Should I create a Jira ticket?  Does anyone have
> anything they would like me to try?  I’m very lost at this point.
>
>
>
> I’ve now seen this issue happen without destroying pods, i.e. the job
> running crashes after several hours and fails to recover once all task
> slots are consumed by stale tasks.  I’m adding additional information in
> hopes of getting to the bottom of this.
>
>
>
> Timeline of crash (I do not have all logs as the log had rolled by the
> time I was able to get the following)
>
>
>
> TaskManager 1, 2019-01-12 11:32:44, throws the following exception:
>
>
>
> 2019-01-12 11:32:44,170 INFO
> org.apache.flink.runtime.taskmanager.Task - Attempting
> to fail task externally Window(SlidingEventTimeWindows(5760, 1440),
> EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16)
> (cd737fd979a849a713c5808f96d06cf1).
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 758 for operator Window(SlidingEventTimeWindows(5760,
> 1440), EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).}
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>
> …snip…
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 758 for
> operator Window(SlidingEventTimeWindows(5760, 1440),
> EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>
> ... 6 more
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> s3a://my-bucket/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
> in order to obtain the stream state handle
>
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>
> …snip…
>
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> s3a://te2-flink/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
> in order to obtain the stream state handle
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeStateData(RocksDBKeyedStateBackend.java:2454)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2588)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>
> ... 7 more
>
> Caused by:
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSS3IOException:
> saving output on
> stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f:
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Your socket connection to the server was not read from or written to within
> the timeout period. Idle connections will be closed. (Service: Amazon S3;
> Status Code: 400; Error Code: RequestTimeout; Request ID:
> 379193EB634E1686), S3 Extended Request ID:
> 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=:
> Your socket connection to the server was not read from or written to within
> the timeout period. Idle connections will be closed. (Service: Amazon S3;
> Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121)
>
> at
> 

Re: [DISCUSS] Dropping flink-storm?

2019-01-09 Thread Till Rohrmann
With https://issues.apache.org/jira/browse/FLINK-10571, we will remove the
Storm topologies from Flink and keep the wrappers for the moment.

However, looking at the FlinkTopologyContext [1], it becomes quite obvious
that Flink's compatibility with Storm is really limited. Almost all of the
context methods are not supported which makes me wonder how useful these
wrappers really are. Given the additional maintenance overhead of having
them in the code base and no indication that someone is actively using
them, I would still be in favour of removing them. This will reduce our
maintenance burden in the future. What do you think?

[1]
https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java

Cheers,
Till

On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:

> Yes, let's do it this way.
> The wrapper classes are probably not too complex and can be easily tested.
> We have the same for the Hadoop interfaces, although I think only the
> Input- and OutputFormatWrappers are actually used.
>
>
> Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> That sounds very good to me.
>>
>> On 08.10.2018 11:36, Till Rohrmann wrote:
>> > Good point. The initial idea of this thread was to remove the storm
>> > compatibility layer completely.
>> >
>> > During the discussion I realized that it might be useful for our users
>> > to not completely remove it in one go. Instead for those who still
>> > want to use some Bolt and Spout code in Flink, it could be nice to
>> > keep the wrappers. At least, we could remove flink-storm in a more
>> > graceful way by first removing the Topology and client parts and then
>> > the wrappers. What do you think?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler > > <mailto:ches...@apache.org>> wrote:
>> >
>> > I don't believe that to be the consensus. For starters it is
>> > contradictory; we can't /drop /flink-storm yet still /keep //some
>> > parts/.
>> >
>> > From my understanding we drop flink-storm completely, and put a
>> > note in the docs that the bolt/spout wrappers of previous versions
>> > will continue to work.
>> >
>> > On 08.10.2018 11:04, Till Rohrmann wrote:
>> >> Thanks for opening the issue Chesnay. I think the overall
>> >> consensus is to drop flink-storm and only keep the Bolt and Spout
>> >> wrappers. Thanks for your feedback!
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
>> >> mailto:ches...@apache.org>> wrote:
>> >>
>> >> I've created
>> >> https://issues.apache.org/jira/browse/FLINK-10509 for
>> >> removing flink-storm.
>> >>
>> >> On 28.09.2018 15:22, Till Rohrmann wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I would like to discuss how to proceed with Flink's storm
>> >> compatibility
>> >> > layer flink-strom.
>> >> >
>> >> > While working on removing Flink's legacy mode, I noticed
>> >> that some parts of
>> >> > flink-storm rely on the legacy Flink client. In fact, at
>> >> the moment
>> >> > flink-storm does not work together with Flink's new
>> distributed
>> >> > architecture.
>> >> >
>> >> > I'm also wondering how many people are actually using
>> >> Flink's Storm
>> >> > compatibility layer and whether it would be worth porting it.
>> >> >
>> >> > I see two options how to proceed:
>> >> >
>> >> > 1) Commit to maintain flink-storm and port it to Flink's
>> >> new architecture
>> >> > 2) Drop flink-storm
>> >> >
>> >> > I doubt that we can contribute it to Apache Bahir [1],
>> >> because once we
>> >> > remove the legacy mode, this module will no longer work
>> >> with all newer
>> >> > Flink versions.
>> >> >
>> >> > Therefore, I would like to hear your opinion on this and in
>> >> particular if
>> >> > you are using or planning to use flink-storm in the future.
>> >> >
>> >> > [1] https://github.com/apache/bahir-flink
>> >> >
>> >> > Cheers,
>> >> > Till
>> >> >
>> >>
>> >
>>
>>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread Till Rohrmann
Hi Wenrui,

I executed AutoParallelismITCase#testProgramWithAutoParallelism and set a
breakpoint in NettClient.java:102 to see whether the configured timeout
value is correctly set. Moreover, I did the same for
AbstractNioChannel.java:207 and it looked as if the correct timeout value
was set.

What is the special uber Flink version? What patches does it include? Are
you able to run your tests with the latest vanilla Flink version?

Cheers,
Till

On Wed, Jan 9, 2019 at 10:40 AM Wenrui Meng  wrote:

> Hi Till,
>
> This job is not on AthenaX but on a special uber version Flink. I tried to
> ping the connected host from connecting host. It seems very stable. For the
> connection timeout, I do set it as 20min but it still report the timeout
> after 2 minutes. Could you let me know how do you test locally about the
> timeout setting?
>
> Thanks,
> Wenrui
>
> On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
>
>> Hi Wenrui,
>>
>> the exception now occurs while finishing the connection creation. I'm not
>> sure whether this is so different. Could it be that your network is
>> overloaded or not very reliable? Have you tried running your Flink job
>> outside of AthenaX?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
>>
>>> Hi Till,
>>>
>>> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
>>> decrease the total parallel the timeout issue can be avoided. But we do
>>> need that amount of taskManagers to process data. In addition, once I
>>> increase the netty server threads to 128, the error is changed to to
>>> following error. It seems the cause is different. Could you help take a
>>> look?
>>>
>>> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
>>> java.io.IOException: Connecting the channel failed: Connecting to remote
>>> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This
>>> might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
>>> at
>>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
>>> has failed. This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java

Re: Subtask much slower than the others when creating checkpoints

2019-01-08 Thread Till Rohrmann
Hi Bruno,

there are multiple reasons why one of the subtasks can take longer for
checkpointing. It looks as if there is not much data skew since the state
sizes are relatively equal. It also looks as if the individual tasks all
start at the same time with the checkpointing which indicates that there
mustn't be a lot of back-pressure in the DAG (or all tasks were equally
back-pressured). This narrows the problem cause down to the asynchronous
write operation. One potential problem could be if the external system to
which you write your checkpoint data has some kind of I/O limit/quota.
Maybe the sum of write accesses deplete the maximum quota you have. You
could try whether running the job with a lower parallelism solves the
problems.

For further debugging it could be helpful to get access to the logs of the
JobManager and the TaskManagers on DEBUG log level. It could also be
helpful to learn which state backend you are using.

Cheers,
Till

On Tue, Jan 8, 2019 at 12:52 PM Bruno Aranda  wrote:

> Hi,
>
> We are using Flink 1.6.1 at the moment and we have a streaming job
> configured to create a checkpoint every 10 seconds. Looking at the
> checkpointing times in the UI, we can see that one subtask is much slower
> creating the endpoint, at least in its "End to End Duration", and seems
> caused by a longer "Checkpoint Duration (Async)".
>
> For instance, in the attach screenshot, while most of the subtasks take
> half a second, one (and it is always one) takes 2 seconds.
>
> But we have worse problems. We have seen cases where the checkpoint times
> out for one tasks, while most take one second, the outlier takes more than
> 5 minutes (which is the max time we allow for a checkpoint). This can
> happen if there is back pressure. We only allow one checkpoint at a time as
> well.
>
> Why could one subtask take more time? This jobs read from kafka partitions
> and hash by key, and we don't see any major data skew between the
> partitions. Does one partition do more work?
>
> We do have a cluster of 20 machines, in EMR, with TMs that have multiple
> slots (in legacy mode).
>
> Is this something that could have been fixed in a more recent version?
>
> Thanks for any insight!
>
> Bruno
>
>
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-08 Thread Till Rohrmann
0.129.13:39466
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
> ... 6 common frames omitted
>
>
> Thanks,
> Wenrui
>
> On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann  wrote:
>
>> Hi Wenrui,
>>
>> the code to set the connect timeout looks ok to me [1]. I also tested it
>> locally and checked that the timeout is correctly registered in Netty's
>> AbstractNioChannel [2].
>>
>> Increasing the number of threads to 128 should not be necessary. But it
>> could indicate that there is some long lasting or blocking operation being
>> executed by the threads.
>>
>> How does the job submission and cluster configuration work with AthenaX?
>> Will the platform spawn for each job a new Flink cluster for which you can
>> specify the cluster configuration?
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java#L102
>> [2]
>> https://github.com/netty/netty/blob/netty-4.0.27.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L207
>>
>> Cheers,
>> Till
>>
>> On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng  wrote:
>>
>>> Hi Till,
>>>
>>> Thanks for your reply and help on this issue.
>>>
>>> I increased taskmanager.network.netty.client.connectTimeoutSec to 1200
>>> which is 20 minutes. But it seems the connection not respects this timeout.
>>> In addition, I increase both taskmanager.network.request-backoff.max
>>> and taskmanager.registration.max-backoff to 20min.
>>>
>>> One thing I found is helpful to some extent is increasing
>>> the taskmanager.network.netty.server.numThreads. I increase it to 128
>>> threads, it can succeed sometimes. But keep increasing it doesn't solve the
>>> problem. We have 100 parallel intermediate results, so there are too many
>>> partition requests. I think that's why it timeout. The solution should let
>>> the connection timeout increase. But I think there is some issue that
>>> connection doesn't respect the timeout config.
>>>
>>> We will definitely try the latest flink version. But at Uber, there is a
>>> team who is responsible to provide a platform with Flink. They will upgrade
>>> it at the end of this Month. Meanwhile, I would like to ask some help to
>>> investigate how to increase the connection timeout and make it respected.
>>>
>>> Thanks,
>>> Wenrui
>>>
>>> On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Wenrui,
>>>>
>>>> from the logs I cannot spot anything suspicious. Which configuration
>>>> parameters have you changed exactly? Does the JobManager log contain
>>>> anything suspicious?
>>>>
>>>> The current Flink version changed quite a bit wrt 1.4. Thus, it might
>>>> be worth a try to run the job with the latest Flink version.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I consistently get connection timeout issue when creating
>>>>> partitionRequestClient in flink 1.4. I tried to ping from the connecting
>>>>> host to the connected host, but the ping latency is less than 0.1 ms
>>>>> consistently. So it's probably not due to the cluster status. I also tried
>>>>> increase max backoff, nettowrk timeout and some other setting, it doesn't
>>>>> help.
>>>>>
>>>>> I enabled the debug log of flink but not find any suspicious or useful
>>>>> information to help me fix the issue. Here is the link
>>>>> <https://www.dropbox.com/sh/sul62muz5pk0bqk/AABX8QbMrNmSq3k8I289mGmSa?dl=0>
>>>>> of the jobManager and taskManager logs. The connecting host is the host
>>>>> which throw the exception. The connected host is the host the connecting
>>>>> host try to request partition from.
>>>>>
>>>>> Since our platform 

Re: Unable to restore the checkpoint on restarting the application!!

2019-01-07 Thread Till Rohrmann
Hi Puneet,

if context.isRestored returns false, then Flink did not resume from a
checkpoint/savepoint. Please make sure that you specify the correct path
the an existing checkpoint.

Cheers,
Till

On Mon, Jan 7, 2019 at 11:04 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi Till
>
> Its Working for me know ,but *context.isRestored() **is always returning
> false.*
>
> On Fri, Jan 4, 2019 at 7:42 PM Till Rohrmann  wrote:
>
>> When starting a job from within the IDE using the LocalEnvironment, it is
>> not possible to specify a checkpoint from which to resume. That's why your
>> recovered state is empty. Flink won't automatically pick up the latest
>> checkpoint it finds in some checkpoint directory.
>>
>> You can test it though by starting a local standalone cluster and submit
>> the job via bin/flink run -s  job.jar [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#restore-a-savepoint
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 4, 2019 at 2:49 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> The List it returns is blank
>>>
>>> On Fri, Jan 4, 2019 at 7:15 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Puneet,
>>>>
>>>> what exactly is the problem when you try to resume from a checkpoint?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Jan 4, 2019 at 2:31 PM Puneet Kinra <
>>>> puneet.ki...@customercentria.com> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> I am creating a poc where i am trying the out of box feature of flink
>>>>> for managed state of operator . I am able to create the checkpoint
>>>>> while running my app in eclipse but when i am trying to restart the app . 
>>>>> I
>>>>> am unable to restore
>>>>> the state.
>>>>>
>>>>> Please find attached below snippet.
>>>>>
>>>>> step followed
>>>>> 1) ran the application that generate tuple automatically.
>>>>> 2) check-pointing is triggering as  it configured.(able to see the
>>>>> data being written in files)
>>>>> 3) stopped the app in eclipse
>>>>> 4) restart the application (unable to restore)
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Cheers *
>>>>>
>>>>> *Puneet*
>>>>>
>>>>
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-07 Thread Till Rohrmann
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it
locally and checked that the timeout is correctly registered in Netty's
AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it
could indicate that there is some long lasting or blocking operation being
executed by the threads.

How does the job submission and cluster configuration work with AthenaX?
Will the platform spawn for each job a new Flink cluster for which you can
specify the cluster configuration?

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java#L102
[2]
https://github.com/netty/netty/blob/netty-4.0.27.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L207

Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng  wrote:

> Hi Till,
>
> Thanks for your reply and help on this issue.
>
> I increased taskmanager.network.netty.client.connectTimeoutSec to 1200
> which is 20 minutes. But it seems the connection not respects this timeout.
> In addition, I increase both taskmanager.network.request-backoff.max
> and taskmanager.registration.max-backoff to 20min.
>
> One thing I found is helpful to some extent is increasing
> the taskmanager.network.netty.server.numThreads. I increase it to 128
> threads, it can succeed sometimes. But keep increasing it doesn't solve the
> problem. We have 100 parallel intermediate results, so there are too many
> partition requests. I think that's why it timeout. The solution should let
> the connection timeout increase. But I think there is some issue that
> connection doesn't respect the timeout config.
>
> We will definitely try the latest flink version. But at Uber, there is a
> team who is responsible to provide a platform with Flink. They will upgrade
> it at the end of this Month. Meanwhile, I would like to ask some help to
> investigate how to increase the connection timeout and make it respected.
>
> Thanks,
> Wenrui
>
> On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann  wrote:
>
>> Hi Wenrui,
>>
>> from the logs I cannot spot anything suspicious. Which configuration
>> parameters have you changed exactly? Does the JobManager log contain
>> anything suspicious?
>>
>> The current Flink version changed quite a bit wrt 1.4. Thus, it might be
>> worth a try to run the job with the latest Flink version.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng  wrote:
>>
>>> Hi,
>>>
>>> I consistently get connection timeout issue when creating
>>> partitionRequestClient in flink 1.4. I tried to ping from the connecting
>>> host to the connected host, but the ping latency is less than 0.1 ms
>>> consistently. So it's probably not due to the cluster status. I also tried
>>> increase max backoff, nettowrk timeout and some other setting, it doesn't
>>> help.
>>>
>>> I enabled the debug log of flink but not find any suspicious or useful
>>> information to help me fix the issue. Here is the link
>>> <https://www.dropbox.com/sh/sul62muz5pk0bqk/AABX8QbMrNmSq3k8I289mGmSa?dl=0>
>>> of the jobManager and taskManager logs. The connecting host is the host
>>> which throw the exception. The connected host is the host the connecting
>>> host try to request partition from.
>>>
>>> Since our platform is not up to date yet, the flink version I used in
>>> this is 1.4. But I noticed that there is not much change of these code on
>>> the Master branch. Any help will be appreciated.
>>>
>>> Here is stack trace of the exception
>>>
>>> from RUNNING to FAILED.
>>> java.io.IOException: Connecting the channel failed: Connecting to remote
>>> task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This
>>> might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteI

Re: Unable to restore the checkpoint on restarting the application!!

2019-01-04 Thread Till Rohrmann
When starting a job from within the IDE using the LocalEnvironment, it is
not possible to specify a checkpoint from which to resume. That's why your
recovered state is empty. Flink won't automatically pick up the latest
checkpoint it finds in some checkpoint directory.

You can test it though by starting a local standalone cluster and submit
the job via bin/flink run -s  job.jar [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#restore-a-savepoint

Cheers,
Till

On Fri, Jan 4, 2019 at 2:49 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> The List it returns is blank
>
> On Fri, Jan 4, 2019 at 7:15 PM Till Rohrmann  wrote:
>
>> Hi Puneet,
>>
>> what exactly is the problem when you try to resume from a checkpoint?
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 4, 2019 at 2:31 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> Hi All
>>>
>>> I am creating a poc where i am trying the out of box feature of flink
>>> for managed state of operator . I am able to create the checkpoint while
>>> running my app in eclipse but when i am trying to restart the app . I am
>>> unable to restore
>>> the state.
>>>
>>> Please find attached below snippet.
>>>
>>> step followed
>>> 1) ran the application that generate tuple automatically.
>>> 2) check-pointing is triggering as  it configured.(able to see the data
>>> being written in files)
>>> 3) stopped the app in eclipse
>>> 4) restart the application (unable to restore)
>>>
>>>
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet*
>>>
>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: Unable to restore the checkpoint on restarting the application!!

2019-01-04 Thread Till Rohrmann
Hi Puneet,

what exactly is the problem when you try to resume from a checkpoint?

Cheers,
Till

On Fri, Jan 4, 2019 at 2:31 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi All
>
> I am creating a poc where i am trying the out of box feature of flink
> for managed state of operator . I am able to create the checkpoint while
> running my app in eclipse but when i am trying to restart the app . I am
> unable to restore
> the state.
>
> Please find attached below snippet.
>
> step followed
> 1) ran the application that generate tuple automatically.
> 2) check-pointing is triggering as  it configured.(able to see the data
> being written in files)
> 3) stopped the app in eclipse
> 4) restart the application (unable to restore)
>
>
>
> --
> *Cheers *
>
> *Puneet*
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-04 Thread Till Rohrmann
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration
parameters have you changed exactly? Does the JobManager log contain
anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be
worth a try to run the job with the latest Flink version.

Cheers,
Till

On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng  wrote:

> Hi,
>
> I consistently get connection timeout issue when creating
> partitionRequestClient in flink 1.4. I tried to ping from the connecting
> host to the connected host, but the ping latency is less than 0.1 ms
> consistently. So it's probably not due to the cluster status. I also tried
> increase max backoff, nettowrk timeout and some other setting, it doesn't
> help.
>
> I enabled the debug log of flink but not find any suspicious or useful
> information to help me fix the issue. Here is the link
> 
> of the jobManager and taskManager logs. The connecting host is the host
> which throw the exception. The connected host is the host the connecting
> host try to request partition from.
>
> Since our platform is not up to date yet, the flink version I used in this
> is 1.4. But I noticed that there is not much change of these code on the
> Master branch. Any help will be appreciated.
>
> Here is stack trace of the exception
>
> from RUNNING to FAILED.
> java.io.IOException: Connecting the channel failed: Connecting to remote
> task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185'
> has failed. This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 common frames omitted
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException:
> connection timed out: 

Re: same parallelism with different taskmanager and slots, skew occurs

2019-01-04 Thread Till Rohrmann
Hi,

could you tell me how exactly you started the cluster and with which
parameters (configured memory, maybe vcores, etc.)?

Cheers,
Till

On Thu, Jan 3, 2019 at 2:37 AM varuy322  wrote:

> Hi, Till
> It's very kind of your reply. I got your point, I'm sorry to not make it
> clear about my issue.
> I generated data by streaming benchmark just as the link:
>
> https://github.com/dataArtisans/databricks-benchmark/blob/master/src/main/scala/com/databricks/benchmark/flink/EventGenerator.scala
> .
>
> What I wanna to say is that, let the parallelism is same assume to 96, just
> changes the tm and slots/tm. The first test to configure tm 3 with 32
> slots/tm, there does not occur data skew, three machine receive same data
> and each partition processed approximate data. Then second test to
> configure
> tm 6 with 16 slots/tm, I find each partition processed same data too, but
> one machine processed data more than the other two machine.
>
> I wonder whether the taskmanager(jvm) competes in one machine? What's more,
> how does the streaming benchmark do with backpressure? I test on cluster
> with 4 node, one for master and three for worker, each node with Intel Xeon
> E5-2699 v4 @ 2.20GHz/3.60GHz, 256G memory, 88 cores, 10Gbps network, I
> could
> not find the bottleneck. It confused me!
>
> Best Regards & Thanks
>
> Rui
>
>
>
> -
> stay hungry, stay foolish.
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question about Flink optimizer on Stream API

2019-01-03 Thread Till Rohrmann
Hi Felipe,

for streaming Flink currently does not optimize the data flow graph. I
think the best reference is actually going through the code as you've done
for the batch case.

Cheers,
Till

On Wed, Dec 19, 2018 at 3:14 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I was reading some FLIP documents related to the new design of the Flink
> Schedule [1] and unification of batch and stream [2]. Then I created two
> different programs to learn how Flink optimizes the Query Plan in Batch and
> in Stream mode (and how much further it goes). One using batch [3] and one
> using Stream [4]. During the code debugging and also as it is depicted on
> the document [2], the batch program uses the
> org.apache.flink.optimizer.Optimizer class which generates a
> "org.apache.flink.optimizer.plan.OptimizedPlan" while stream program uses
> the "org.apache.flink.streaming.api.graph.StreamGraph" and every
> transformation inside the packet
> "org.apache.flink.streaming.api.transformations".
>
> When I am showing the execution plan with "env.getExecutionPlan()" I see
> exactly I have written on the Flink program (which it is expected).
> However, I was looking for where I can see the optimized plan. I mean
> decisions of operators reordering based on cost or statistics. For batch I
> could find the "org.apache.flink.optimizer.costs.CostEstimator" and
> "org.apache.flink.optimizer.DataStatistics". But for Stream I only found
> the creation of the plan. How can I debug that? Or have a better
> understanding of what Flink is doing. Do you advise me to read some other
> reference about this?
>
> Kind Regards,
> Felipe
>
> [1] Group-aware scheduling for Flink -
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk
> [2] Unified Core API for Streaming and Batch -
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit#
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/batch/MatrixMultiplication.java
> [4]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/SensorsReadingMqttJoinQEP.java
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: How to shut down Flink Web Dashboard in detached Yarn session?

2019-01-02 Thread Till Rohrmann
You could also use `jsp` or `ps` to check that no TaskExecutor and
StandaloneJobClusterEntrypoint is running. If there are no such processes,
then there should not be a Flink cluster running locally.

Cheers,
Till

On Wed, Jan 2, 2019 at 6:31 PM Sai Inampudi  wrote:

> Hey Till,
>
> If it is running on a standalone Flink cluster, wouldn't running
> stop-cluster.sh work?
>
> When I run stop-cluster.sh, I get back:
> No taskexecutor daemon to stop on host .
> No standalonesession daemon to stop on host .
>
> So I assumed that meant that it is not running on a standalone cluster
>
> On 2019/01/02 14:13:52, Till Rohrmann  wrote:
> > Hi Sai,
> >
> > could you check that the dashboard you are seeing is really running on
> Yarn
> > and not a standalone Flink cluster which you have running locally?
> >
> > Cheers,
> > Till
> >
> > On Mon, Dec 31, 2018 at 7:40 PM Sai Inampudi 
> wrote:
> >
> > > Hey Gary, thanks for reaching out.
> > >
> > > Executing "yarn application -list" does not return my flink cluster so
> I
> > > assume like in my initial post that the application must be
> terminated. My
> > > config when I ran the job did not have log aggregation enabled and that
> > > might be why when I try to look at the logs via "yarn logs
> -applicationId
> > > ", I get back nothing:
> > > (e.g. Unable to get ApplicationState. Attempting to fetch logs
> > > directly from the filesystem.
> > > /tmp/logs/si022833/logs/application_1545041832015_73428 does not
> > > exist.)
> > >
> > >
> > > (My previous reply was formatted incorrectly so I am replying back with
> > > proper formatting. Apologies for the mistake)
> > >
> > >
> > > On 2018/12/31 18:13:05, Sai Inampudi  wrote:
> > > >
> > > >
> > > > On 2018/12/31 10:53:58, Gary Yao  wrote:
> > > > > Hi,
> > > > >
> > > > > You can use the YARN client to list all applications on your YARN
> > > cluster:
> > > > >
> > > > > yarn application -list
> > > > >
> > > > > If this does not show any running applications, the Flink cluster
> must
> > > have
> > > > > somehow terminated. If you have YARN's log aggregation enabled, you
> > > should
> > > > > be
> > > > > able to view the Flink logs by running:
> > > > >
> > > > > yarn logs -applicationId 
> > > > >
> > > > > Best,
> > > > > Gary
> > > > >
> > > > > On Fri, Dec 28, 2018 at 9:42 PM Sai Inampudi <
> sai.inamp...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I recently attempted to create a Flink cluster on YARN by
> executing
> > > the
> > > > > > following:
> > > > > > ~/flink-1.5.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm
> > > flink_yarn
> > > > > >
> > > > > > The resulting command was not completely successful but it did
> end up
> > > > > > creating a Apache Flink Dashboard with 1 Task Manager, 1 Task
> Slot,
> > > and 1
> > > > > > Job Manager.
> > > > > >
> > > > > > When I look at my Yarn Resource Manager, I don't see my
> application
> > > > > > running. CLI calls for the application id also returned nothing.
> > > > > >
> > > > > > I would like to kill the existing web dashboard as well as the
> other
> > > > > > lingering task manager/job manager so that I can try recreating
> the
> > > yarn
> > > > > > session successfully.
> > > > > >
> > > > > > Has anyone encountered this before and has any suggestion? I
> looked
> > > > > > through documentation [1] which says to stop a yarn session, you
> > > will want
> > > > > > to use the YARN utilities (yarn application -kill ) to
> stop
> > > the YARN
> > > > > > session. However, the application id in my logs is not found in
> the
> > > > > > Resource Manager so it seems to already have been killed (due to
> the
> > > > > > original yarn session command not properly executing?).
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/yarn_setup.html#detached-yarn-session
> > > > > >
> > > > > Hey Gary, thanks for reaching out.
> > > > Executing "yarn application -list" does not return my flink cluster
> so I
> > > assume like in my initial post that the application must be
> terminated. My
> > > config when I ran the job did not have log aggregation enabled and that
> > > might be why when I try to look at the logs via "yarn logs
> -applicationId
> > > ", I get back nothing (e.g. Unable to get
> ApplicationState.
> > > Attempting to fetch logs directly from the filesystem.
> > > > /tmp/logs/si022833/logs/application_1545041832015_73428 does not
> exist.)
> > > >
> > >
> >
>


Re: using updating shared data

2019-01-02 Thread Till Rohrmann
Yes exactly Avi.

Cheers,
Till

On Wed, Jan 2, 2019 at 5:42 PM Avi Levi  wrote:

> Thanks Till I will defiantly going to check it. just to make sure that I
> got you correctly. you are suggesting the the list that I want to broadcast
> will be broadcasted via control stream and it will be than be kept in the
> relevant operator state correct ? and updates (CRUD) on that list will be
> preformed via the control stream. correct ?
> BR
> Avi
>
> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann  wrote:
>
>> Hi Avi,
>>
>> you could use Flink's broadcast state pattern [1]. You would need to use
>> the DataStream API but it allows you to have two streams (input and control
>> stream) where the control stream is broadcasted to all sub tasks. So by
>> ingesting messages into the control stream you can send model updates to
>> all sub tasks.
>>
>> [1]
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>
>>> Im trying to understand  your  use case.
>>> What is the source  of the data ? FS ,KAFKA else ?
>>>
>>>
>>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi  wrote:
>>>
>>>> Hi,
>>>> I have a list (couple of thousands text lines) that I need to use in my
>>>> map function. I read this article about broadcasting variables
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23broadcast-2Dvariables=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=U3vGeHdL9fGDfP0GNZUkGpSlcVLz9CNLg2MXNwHP0_M=>
>>>>  or
>>>> using distributed cache
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23distributed-2Dcache=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=m5IHbX1Dbz7AYERvVgyxKXmrUQQ06IkA4VCDllkR0HM=>
>>>> however I need to update this list from time to time, and if I understood
>>>> correctly it is not possible on broadcast or cache without restarting the
>>>> job. Is there idiomatic way to achieve this? A db seems to be an overkill
>>>> for that and I do want to be cheap on io/network calls as much as possible.
>>>>
>>>> Cheers
>>>> Avi
>>>>
>>>>


Re: Are Jobs allowed to be pending when slots are not enough

2019-01-02 Thread Till Rohrmann
Hi Xinyu,

at the moment there is no such functionality in Flink. Whenever you submit
a job, Flink will try to execute the job right away. If the job cannot get
enough slots, then it will wait until the slot.request.timeout occurs and
either fail or retry if you have a RestartStrategy configured.

If you want to wait until you have enough slots before submitting a job, I
would suggest that you write yourself a small service which uses Flink's
REST API [1] to query the status and finally submit the job if there are
enough free slots.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#overview-1

Cheers,
Till

On Wed, Jan 2, 2019 at 2:09 PM 张馨予  wrote:

> Hi all
>
>
>
> We submit some batch jobs to a Flink cluster which with 500 slots for
> example. The parallelism of these jobs may be different, between 1 and 500.
>
> Is there any configuration that can make jobs running in submitting order
> once the cluster has enough slots? If not, could we meet this requirement?
>
>
>
> Thanks.
>
>
>
> Xinyu Zhang
>


Re: same parallelism with different taskmanager and slots, skew occurs

2019-01-02 Thread Till Rohrmann
Hi Rui,

such a situation can occur if you have data skew in your data set
(differently sized partitions if you key by some key). Assume you have 2
TMs with 2 slots each and you key your data by some key x. The partition
assignment could look like:

TM1: slot_1 = Partition_1, slot_2 = Partition_2
TM2: slot_1 = Partition_3, slot_2 = Partition_4

Now assume that partition_1 and partition_3 are ten times bigger than
partition_2 and partition_4. From a TM perspective both TMs would process
the same amount of data.

If you now start 4 TMs with a single slot each you could get the following
assignment:

TM1: slot_1 = Partition_1
TM2: slot_1 = Partition_2
TM3: slot_3 = Partition_3
TM4: slot_4 = Partition_4

Now from a TM perspective, TM1 and TM3 would process ten times more data
than TM2 and TM4.

Does this make sense? What you could check is whether you can detect such a
data skew in your input data (e.g. by counting the occurrences of items
with a specific key).

Cheers,
Till

On Wed, Jan 2, 2019 at 6:13 AM varuy322  wrote:

> Hi, there
>
> Recently I run streaming benchmark with flink 1.5.2 standalone on the
> cluster with 4 machines(1 as master and others as workers), it appears
> different result as below:
> (1). when I set the parallelism with 96, source, sink and middle operator
> parallelism all set to 96, start 3 taskmanager and each taskmanager slot is
> 32, all goes well.
> (2). when I change (1) to start 6 taskmanager, here 2 taskmanger on each
> work and each taskmanager slot is 16. all goes well too. At this situation,
> I find the subtask on each work processed same data size, but one worker
> processed times than other worker, it seems data skew occur. How could this
> happen?
>
> Someone could explain to me that when set same parallelism, the performance
> between multi taskmanager each worker with slots and one taskmanager with
> more slots?
> Thanks a lot!
>
> Best Regards
> Rui
>
>
>
> -
> stay hungry, stay foolish.
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-02 Thread Till Rohrmann
Hi Kaibo,

which Kafka version are you running locally? When enabling exactly once
processing guarantees, you need at least Kafka >= 0.11. The
UnsupportedVersionException indicates that this constraint is not fulfilled
[1].

[1]
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Cheers,
Till

On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou  wrote:

> Hi,
> I encountered an error while running the kafka sink demo in IDEA.
>
> This is the complete code:
>
> import java.util.Properties
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.runtime.state.filesystem.FsStateBackend
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
>
> object kafka_test {
>
>   def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))
>
> val config = env.getCheckpointConfig
>
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> config.setCheckpointInterval(15 * 1000)
>
> val event = env.socketTextStream("localhost", )
> val propsTarget = new Properties()
> propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
> propsTarget.setProperty("enable.idempotence", "true")
>
> val outputProducer = new FlinkKafkaProducer011[String](
>   "test-output",
>   new KeyedSerializationSchemaWrapper[String](new
> SimpleStringSchema()),
>   propsTarget,
>   FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to
> Semantic.AT_LEAST_ONCE
> )
>
> event.addSink(outputProducer).name("sink_to_kafka")
> env.execute()
>   }
> }
>
> Start the command "nc -l " before running the above code.
> Error message:
>
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId :
> 73be1e1168f91ee2
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  -
> Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.clients.producer.internals.TransactionManager  -
> [TransactionalId Source: Socket Stream -> Sink:
> sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with
> epoch -1
> 7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink:
> sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from
> RUNNING to FAILED.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
> Source: Socket Stream -> Sink: sink_to_kafka (1/1)
> (a7cea618f99152987bb4a52b4d1df0e3).
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
> streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka
> (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
> and sending final execution state FAILED to JobManager for task Source:
> Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket
> Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3)
> switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
> Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state
> RUNNING to FAILING.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7202 

Re: using updating shared data

2019-01-02 Thread Till Rohrmann
Hi Avi,

you could use Flink's broadcast state pattern [1]. You would need to use
the DataStream API but it allows you to have two streams (input and control
stream) where the control stream is broadcasted to all sub tasks. So by
ingesting messages into the control stream you can send model updates to
all sub tasks.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Cheers,
Till

On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:

> Im trying to understand  your  use case.
> What is the source  of the data ? FS ,KAFKA else ?
>
>
> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi  wrote:
>
>> Hi,
>> I have a list (couple of thousands text lines) that I need to use in my
>> map function. I read this article about broadcasting variables
>> 
>>  or
>> using distributed cache
>> 
>> however I need to update this list from time to time, and if I understood
>> correctly it is not possible on broadcast or cache without restarting the
>> job. Is there idiomatic way to achieve this? A db seems to be an overkill
>> for that and I do want to be cheap on io/network calls as much as possible.
>>
>> Cheers
>> Avi
>>
>>


Re: RuntimeException with valve output watermark when using CoGroup

2019-01-02 Thread Till Rohrmann
Thanks for the update Taneli. Glad that you solved the problem. If you
should find out more about the more obscure case, let us know. Maybe there
is something we can still improve to prevent misleading exceptions in the
future.

Cheers,
Till

On Tue, Jan 1, 2019 at 3:01 PM Taneli Saastamoinen <
taneli.saastamoi...@gmail.com> wrote:

> To return to this old thing, this was basically user error. The second of
> the transformations was keying by a field that was sometimes null after the
> first transformation. (This was supposed to never happen, but then it did
> happen in production.)
>
> The confusing part is where the exception occurs. The NullPointerException
> happens because of, and in, the second transformation, but in my example
> here the stack trace points to the first transformation. Of course Flink
> doesn't execute the lines literally like that (i.e. there is optimisation
> going on), so the true location of the error is obscured.
>
> I tried to create a small reproducible example of this but only managed to
> get a situation where the NullPointerException instead very clearly points
> to the second transformation. I'm not sure how to reproduce the weirder
> edition of the error since it seems to depend on the query optimiser, which
> in turn might depend on data volumes, pojo structures etc.
>
> In any case, errors like this can of course be easily detected and fixed
> with proper unit tests, whereas I didn't originally have quite full
> coverage for unexpected partially-null data.
>
> Cheers,
>
>
>
> On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen <
> taneli.saastamoi...@gmail.com> wrote:
>
>> On 27 July 2018 at 19:21, Chesnay Schepler  wrote:
>> > At first glance this looks like a bug. Is the nothing in the stack
>> trace after the NullPointerException?
>>
>> Hmm, there is actually, sorry about that:
>>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>> at
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>> at
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>> at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>> at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>> ... 22 more
>>
>> > How reliably can you reproduce this?
>>
>> 100%, though I've only run this job a handful of times. What's
>> frustrating is that I cannot reproduce this easily in a unit test (all my
>> unit tests work fine). On production data it happens every time and pretty
>> much instantly, but our data volumes are big enough that it's difficult to
>> try to dig into it further.
>>
>> For now I think I'll split the job into two, have the first aggregation
>> write to Kafka and have the second aggregation as a separate job that reads
>> its input from Kafka. When I run the first aggregation only that is fine
>> and no errors occur, so the issue seems to be the combination of
>> aggregations.
>>
>> Cheers,
>>
>>


Re: How to shut down Flink Web Dashboard in detached Yarn session?

2019-01-02 Thread Till Rohrmann
Hi Sai,

could you check that the dashboard you are seeing is really running on Yarn
and not a standalone Flink cluster which you have running locally?

Cheers,
Till

On Mon, Dec 31, 2018 at 7:40 PM Sai Inampudi  wrote:

> Hey Gary, thanks for reaching out.
>
> Executing "yarn application -list" does not return my flink cluster so I
> assume like in my initial post that the application must be terminated. My
> config when I ran the job did not have log aggregation enabled and that
> might be why when I try to look at the logs via "yarn logs -applicationId
> ", I get back nothing:
> (e.g. Unable to get ApplicationState. Attempting to fetch logs
> directly from the filesystem.
> /tmp/logs/si022833/logs/application_1545041832015_73428 does not
> exist.)
>
>
> (My previous reply was formatted incorrectly so I am replying back with
> proper formatting. Apologies for the mistake)
>
>
> On 2018/12/31 18:13:05, Sai Inampudi  wrote:
> >
> >
> > On 2018/12/31 10:53:58, Gary Yao  wrote:
> > > Hi,
> > >
> > > You can use the YARN client to list all applications on your YARN
> cluster:
> > >
> > > yarn application -list
> > >
> > > If this does not show any running applications, the Flink cluster must
> have
> > > somehow terminated. If you have YARN's log aggregation enabled, you
> should
> > > be
> > > able to view the Flink logs by running:
> > >
> > > yarn logs -applicationId 
> > >
> > > Best,
> > > Gary
> > >
> > > On Fri, Dec 28, 2018 at 9:42 PM Sai Inampudi 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I recently attempted to create a Flink cluster on YARN by executing
> the
> > > > following:
> > > > ~/flink-1.5.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm
> flink_yarn
> > > >
> > > > The resulting command was not completely successful but it did end up
> > > > creating a Apache Flink Dashboard with 1 Task Manager, 1 Task Slot,
> and 1
> > > > Job Manager.
> > > >
> > > > When I look at my Yarn Resource Manager, I don't see my application
> > > > running. CLI calls for the application id also returned nothing.
> > > >
> > > > I would like to kill the existing web dashboard as well as the other
> > > > lingering task manager/job manager so that I can try recreating the
> yarn
> > > > session successfully.
> > > >
> > > > Has anyone encountered this before and has any suggestion? I looked
> > > > through documentation [1] which says to stop a yarn session, you
> will want
> > > > to use the YARN utilities (yarn application -kill ) to stop
> the YARN
> > > > session. However, the application id in my logs is not found in the
> > > > Resource Manager so it seems to already have been killed (due to the
> > > > original yarn session command not properly executing?).
> > > >
> > > >
> > > >
> > > >
> > > > [1]
> > > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/yarn_setup.html#detached-yarn-session
> > > >
> > > Hey Gary, thanks for reaching out.
> > Executing "yarn application -list" does not return my flink cluster so I
> assume like in my initial post that the application must be terminated. My
> config when I ran the job did not have log aggregation enabled and that
> might be why when I try to look at the logs via "yarn logs -applicationId
> ", I get back nothing (e.g. Unable to get ApplicationState.
> Attempting to fetch logs directly from the filesystem.
> > /tmp/logs/si022833/logs/application_1545041832015_73428 does not exist.)
> >
>


Re: no log exists in JM and TM when updated to Flink 1.7

2019-01-02 Thread Till Rohrmann
Hi Joshua,

could you check the content of the logback.xml. Maybe this file has changed
between the versions.

Cheers,
Till

On Wed, Dec 26, 2018 at 11:19 AM Joshua Fan  wrote:

> Hi,
>
> It is very weird that there is no log file for JM and TM when run flink
> job on yarn after updated flink to 1.7.on Flink 1.4.2, everything is OK. I
> checked the log directory, there were jobmanager.error and jobmanager.out,
> but without jobmanager.log, but the log message which should exist in
> jobmanager.log now shows up in jobmanager.error. The taskmanager has the
> same situation, no taskmanager.log but information exists in
> taskmanager.error.
>
> below is the container lauch shell, and it seems ok.
>
> yarn 21784  0.0  0.0 109896  1480 ?Ss   15:13   0:00 /bin/bash
> -c /home/yarn/software/java8/bin/java -Xmx424m
> -Dlog.file=/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.log
> -Dlogback.configurationFile=file:logback.xml
> org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.out
> 2>
> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/
>
> any hints?Thanks a lot.
>
> Yours
> Joshua
>


Re: 1.6 UI issues

2019-01-02 Thread Till Rohrmann
Hi Oleksandr,

the requestJob call should only take longer if either the `JobMaster` is
overloaded and too busy to respond to the request or if the
ArchivedExecutionGraph is very large (e.g. very large accumulators) and
generating it and sending it over to the RestServerEndpoint takes too long.
This is also the change which was introduced with Flink 1.5. Instead of
simply handing over a reference to the RestServerEndpoint from the
JobMaster, the ArchivedExecutionGraph now needs to be sent through the
network stack to the RestServerEndpoint.

If you did not change the akka.framesize then the maximum size of the
ArchivedExecutionGraph should only be 10 MB, though. Therefore, I would
guess that your `JobMaster` must be quite busy if the requests time out.

Cheers,
Till

On Wed, Jan 2, 2019 at 10:58 AM Oleksandr Nitavskyi 
wrote:

> Hello guys. Happy new year!
>
>
>
> Context: we started to have some troubles with UI after bumping our Flink
> version from 1.4 to 1.6.3. UI couldn’t render Job details page, so
> inspecting of the jobs for us has become impossible with the new version.
>
>
>
> And looks like we have a workaround for our UI issue.
>
> After some investigation we realized that starting from Flink 1.5 version
> we started to have a timeout on the actor call: 
> *restfulGateway.requestJob(jobId,
> timeout)* in *ExecutionGraphCache*. So we have increased *web.timeout*
> parameter and we have stopped to have timeout exception on the JobManager
> side.
>
>
>
> Also in *SingleJobController* on the Angular JS side we needed to tweak
> *web.refresh-interval* in order to ensure that Front-End is waiting for
> back-end request to be finished. Otherwise Angular JS side can make another
> request in SingleJobController and don’t know why when older request is
> finished no UI has been changed. We will have a look closer on this
> behavior.
>
>
>
> Does it ring a bell for you probably?
>
>
>
> Thank you
>
>
>
> Kind Regards
>
> Oleksandr
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday 19 December 2018 at 16:52
> *To: *Juan Gentile 
> *Cc: *"dwysakow...@apache.org" , Jeff Bean <
> j...@data-artisans.com>, Oleksandr Nitavskyi 
> *Subject: *Re: 1.6 UI issues
>
>
>
> Hi Juan,
>
>
>
> thanks for the log. The log file does not contain anything suspicious. Are
> you sure that you sent me the right file? The timestamps don't seem to
> match. In the attached log, the job seems to run without problems.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Dec 19, 2018 at 10:26 AM Juan Gentile 
> wrote:
>
> Hello Till, Dawid
>
> Sorry for the late response on this issue and thank you Jeff for helping
> us with this.
>
> Yes we are using 1.6.2
>
> I attach the logs from the Job Master.
>
> Also we noticed this exception:
>
> 2018-12-19 08:50:10,497 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   -
> Implementation error: Unhandled exception.
>
> java.util.concurrent.CancellationException
>
> at
> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
>
> at
> org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.getExecutionGraph(ExecutionGraphCache.java:124)
>
> at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:76)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:78)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:154)
>
> at
> org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:142)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>
>

Re: [ANNOUNCE] Apache Flink 1.5.6 released

2018-12-28 Thread Till Rohrmann
Thanks a lot for being our release manager Thomas. Great work! Also thanks
to the community for making this release possible.

Cheers,
Till

On Thu, Dec 27, 2018 at 2:58 AM Jeff Zhang  wrote:

> Thanks Thomas. It's nice to have a more stable flink 1.5.x
>
> vino yang  于2018年12月27日周四 上午9:43写道:
>
>> Thomas, thanks for being a release manager.
>> And Thanks for the whole community.
>> I think the release of Flink 1.5.6 makes sense for many users who are
>> currently unable to upgrade major versions.
>>
>> Best,
>> Vino
>>
>> jincheng sun  于2018年12月27日周四 上午8:00写道:
>>
>>> Thanks a lot for being our release manager Thomas.
>>> Thanks a lot for made this release possible!
>>>
>>> Cheers,
>>> Jincheng
>>>
>>> Thomas Weise  于2018年12月27日周四 上午4:03写道:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache
 Flink 1.5.6, which is the final bugfix release for the Apache Flink 1.5
 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data
 streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements
 for this bugfix release:
 https://flink.apache.org/news/2018/12/22/release-1.5.6.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344315

 We would like to thank all contributors of the Apache Flink community
 who
 made this release possible!

 Regards,
 Thomas

>>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [ANNOUNCE] Apache Flink 1.6.3 released

2018-12-24 Thread Till Rohrmann
Thanks a lot for being our release manager Gordon. Great job! And also a
big thanks to the community for making this release possible.

Cheers,
Till

On Mon, Dec 24, 2018 at 2:11 AM vino yang  wrote:

> Thanks for being the release manager Gordon. And glad to see Flink 1.6.3
> released.
>
> Best,
> Vino
>
> Tzu-Li (Gordon) Tai  于2018年12月23日周日 下午9:35写道:
>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.6.3, which is the third bugfix release for the Apache
>> Flink 1.6 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2018/12/22/release-1.6.3.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344314
>>
>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>>
>> Regards,
>> Gordon
>>
>>


[ANNOUNCE] Weekly community update #51

2018-12-20 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #51. Please post any news and
updates you want to share with the community to this thread.

# Flink Forward China is happening

This week the Flink community meets in Beijing for the first Flink Forward
China which takes place from the 20th to the 21st of December. Find out
more about the given talks and training sessions on the website [0].

# Release voting for 1.5.6, 1.6.3 and 1.7.1

The community is currently voting on three bug fix releases Flink 1.5.6,
1.6.3 and 1.7.1 [1, 2, 3]. Please help the community by trying out the
release candidates.

# Flink SQL DDL design

The Flink SQL DDL design discussion is currently converging and the
community tries to define the scope of the MVP [4]. Join the discussion to
learn where the journey is headed to.

# Improving Flink's Kerberos integration

Rong started a discussion on how Flink's Kerberos integration could be
improved to better meet enterprise requirements [5]. If you want to share
your experiences and voice your opinion what should be supported by Flink
wrt Kerberos please join this discussion.

# Python and non-JVM language support in Flink

Xianda started a discussion on Flink's support of non-JVM languages in
particular Python [6]. The discussion revolves around three different
strategies: (1) Language portability via Apache Beam; (2) Implement own
Python API; (3) Implement own portability layer. Please chime in if you
want to make your opinion heard.

[0] https://china-2018.flink-forward.org/
[1]
https://lists.apache.org/thread.html/dfcb64406bc70d85b6d9aed34638e9099524c1c0de54c3a0e7590aa3@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/4c2b101abc5b443cc16a83e918095d116250ee008ae0299d459943ca@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/e7cc411d0654131d1cacd19448363ceece87f459be364e66f70982a1@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/9e8eaa74b391ac21ca7268475e1179e7965ecd6389b8f5bbf9e7d6e2@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/7ce36d5ecf0e5d0bfc106affc21dec11da514aac16b2f0971f53f60a@%3Cdev.flink.apache.org%3E
[6]
https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: [SURVEY] Usage of flink-python and flink-streaming-python

2018-12-19 Thread Till Rohrmann
Thanks a lot for the feedback for this survey. I will close it now since 6
days have passed without new activity.

To me it seems that we currently don't have many users who use flink-python
or flink-streaming-python because of their limitations (mentioned in the
survey by Xianda). This information might be useful when discussing Flink's
future Python strategy and whether to continue supporting flink-python and
flink-streaming-python in the future.

Cheers,
Till

On Thu, Dec 13, 2018 at 10:50 AM Stephan Ewen  wrote:

> You are right. Let's refocus this on the python user survey and spin out
> another thread.
>
> On Thu, Dec 13, 2018 at 9:56 AM Xianda Ke  wrote:
>
> > Hi Folks,
> > To avoid polluting the survey thread with discussions, we started
> separate
> > thread and maybe we can continue the discussion over there.
> >
> > Regards,
> > Xianda
> >
> > On Wed, Dec 12, 2018 at 3:34 AM Stephan Ewen  wrote:
> >
> > > I like that we are having a general discussion about how to use Python
> > and
> > > Flink together in the future.
> > > The current python support has some shortcomings that were mentioned
> > > before, so we clearly need something better.
> > >
> > > Parts of the community have worked together with the Apache Beam
> project,
> > > which is pretty far in adding a portability layer to support Python.
> > > Before we dive deep into a design proposal for a new Python API in
> > Flink, I
> > > think we should figure out in which general direction Python support
> > should
> > > go.
> > >
> > > *Option (1): Language portability via Apache Beam*
> > >
> > > Pro:
> > >   - already exists to a large extend and already has users
> > >   - portability layer offers other languages in addition to python. Go
> is
> > > in the making, NodeJS has been speculated, etc.
> > >   - collaboration with another project / community which means more
> > > manpower and exposure. Beam currently has a strong focus on Flink as a
> > > runner for Python.
> > >   - Python API is used for existing ML libraries from the TensorFlow
> > > ecosystem
> > >
> > > Con:
> > >   - Not Flink's API. Python users need to learn the syntax of another
> API
> > > (Python API is inherently different, but even more different here).
> > >
> > > *Option (2): Implement own Python API*
> > >
> > > Pro:
> > >   - Python API will be closer to Flink Java / Scala APIs
> > >
> > > Con:
> > >   - We will only have Python.
> > >   - Need to to rebuild the Python language bridge (significant work to
> > get
> > > stable)
> > >   - might lose tight collaboration with Beam and the other parties in
> > Beam
> > >   - not benefiting from Beam's ecosystem
> > >
> > > *Option (3): **Implement own portability layer*
> > >
> > > Pro
> > >   - Flexibility to align APIs across languages within Flink ecosystem
> > >
> > > Con
> > >   - A lot of work (for context, to get this feature complete, Beam has
> > > worked on that for a year now)
> > >   - Replicating work that already exists
> > >   - good chance to lose tight collaboration with Beam and parties in
> that
> > > project
> > >   - not benefiting from Beam's ecosystem
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Tue, Dec 11, 2018 at 3:38 PM Thomas Weise  wrote:
> > >
> > > > Did you take a look at Apache Beam? It already provides a
> comprehensive
> > > > Python SDK and can be used with Flink:
> > > > https://beam.apache.org/roadmap/portability/#python-on-flink
> > > >
> > > > We are using it at Lyft for Python streaming pipelines.
> > > >
> > > > Thomas
> > > >
> > > > On Tue, Dec 11, 2018 at 5:54 AM Xianda Ke 
> wrote:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > 1. So far as I know, most of the users at Alibaba are using SQL.
> > Some
> > > of
> > > > > users at Alibaba want integrated python libraries with Flink for
> > > > streaming
> > > > > processing, and Jython is unusable.
> > > > >
> > > > > 2. Python UDFs for SQL:
> > > > > * declaring python UDF based on Alibaba's internal DDL syntax.
> > > > > * start a Python process in open()
> > > > > * communicate with JV

Re: Unbalanced Kafka consumer consumption

2018-12-19 Thread Till Rohrmann
Great to hear and thanks for letting us know.

Cheers,
Till

On Wed, Dec 19, 2018 at 5:39 PM Gerard Garcia  wrote:

> We finally figure it out. We had a large value in the Kafka consumer
> option 'max.partition.fetch.bytes', this made the KafkaConsumer to not
> consume at a balanced rate from all partitions.
>
> Gerard
>


Re: flink operator's latency metrics continues to increase.

2018-12-19 Thread Till Rohrmann
Hi Suxing Lee,

thanks for reaching out to me. I forward this mail also to the user mailing
list because it could be interesting for others as well.

Your observation could indeed be an indicator for a problem with the
latency metric. I quickly checked the code and on the first glance it looks
right to me that we increase the nextTimestamp field by period in
RepeatedTriggerTask because we schedule this task at a fixed rate in
SystemProcessingTimeService#scheduleAtFixedRate. Internally this method
calls ScheduledThreadPoolExecutor#scheduleAtFixedRate which uses
System.nanoTime to schedule tasks repeatedly. In fact, the same logic will
be used by the ScheduledThreadPoolExecutor#ScheduledFutureTask. If a GC
pause or another stop the world event happens, this should only affect one
latency metric and not all (given that System.nanoTime continues to
increase) because the next will be scheduled faster since System.nanoTime
might have progressed more.

What could be a problem is that we compute the latency by
System.currentTimeMillis - marker.getMarkedTime. I think there is no
guarantee that System.currentTimeMillis and System.nanoTime don't drift
apart. Especially if they are executed on different machines. This is
something which we could check.

This link [1] explains the drift problem a bit more in detail.

In any case, I would suggest to open a JIRA issue to report this problem.

[1] https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6519418

Cheers,
Till

On Mon, Dec 17, 2018 at 2:37 PM Suxing Lee <913910...@qq.com> wrote:

> Hi Till Rohrmann,
>
> I was running flink 1.5.5,  and I use prometheus to collect metrics to
> check latency of my jobs.
> But sometimes I observerd that  the operator's latency metrics continues
> to increase in my job.
> The operator's latency time is increased by approximately 2.7 minutes per
> day (please see the attached screenshots)
>
> my job's logic is simple,just distribute data from kafkaSource to
> bucketingSink.
> so  I check the consumer offsets in kafka for  consumer group, I also
> check the latest data in hdfs . in fact, there is no serious latency in my
> job.
>
> I notice that the statistical method of latency is currentTimeMillis minus
> LatencyMarker's markedTime.
> but LatencyMarker's timestamp come from RepeatedTriggerTask's
> nextTimestamp which compute timestamp by plus a period(default  value is 2s
> before v1.5.5),the nextTimestamp will be delay when JVM GC or linux
> preemptive scheduling happened. as time increases,the nextTimestamp is much
> later than the current time ( I had verify this result via  the JVM Heap
> Dump).
>
> we can avoid the above situation by directly using linux's NTP to
> guarantee accuracy,not need to compute timestamp by process.
> I'm not very familiar with  SystemProcessingTimeService. Is there some
> detail I have not think about?
>
>
> Best regards and thanks for your help.
> Suxing Lee
>
>
>
>
>


Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Till Rohrmann
Hi Alexandru,

minikube ssh 'sudo ip link set docker0 promisc on' is not supposed to solve
the problem you are seeing. It only resolves the problem if the JobMaster
wants to reach itself through the jobmanager-service name. Your problem
seems to be something else. Could you check if jobmanager-service resolves
on a pod by sshing into it and pinging this address?

Cheers,
Till

On Wed, Dec 19, 2018 at 4:08 PM Alexandru Gutan 
wrote:

> Got it working on the Google Cloud Platform Kubernetes service...
> More support for Minikube is needed.
>
> On Wed, 19 Dec 2018 at 13:44, Alexandru Gutan 
> wrote:
>
>> I've found this in the archives:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201804.mbox/%3CCALbFKXr=rp9TYpD_JA8vmuWbcjY0+Lp2mbr4Y=0fnh316hz...@mail.gmail.com%3E
>>
>> And as suggested I tried a different startup order but unsuccessful:
>>
>> kubectl create -f jobmanager-deployment.yaml
>> kubectl create -f jobmanager-service.yaml
>> kubectl create -f taskmanager-deployment.yaml
>>
>> I get the same error *java.net.UnknownHostException: flink-jobmanager: 
>> Temporary failure in name resolution*
>>
>>
>> On Wed, 19 Dec 2018 at 13:27, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Alexandru,
>>>
>>> This sounds reasonable that it might be because of this minikube command
>>> failed, but I am not a kubernetes expert. I cc Till who knows more on this.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 19/12/2018 14:16, Alexandru Gutan wrote:
>>>
>>> Thanks!
>>> I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
>>> The Hadoop related error is gone, but I have a new error:
>>>
>>> Starting Task Manager
>>> config file:
>>> jobmanager.rpc.address: flink-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.heap.size: 1024m
>>> taskmanager.heap.size: 1024m
>>> taskmanager.numberOfTaskSlots: 2
>>> parallelism.default: 1
>>> rest.port: 8081
>>> blob.server.port: 6124
>>> query.server.port: 6125
>>> Starting taskexecutor as a console application on host
>>> flink-taskmanager-54b679f8bb-22b4r.
>>> 2018-12-19 13:09:38,469 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> 
>>> 2018-12-19 13:09:38,470 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
>>> TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>> 2018-12-19 13:09:38,470 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
>>> user: flink
>>> 2018-12-19 13:09:38,921 WARN
>>> org.apache.hadoop.util.NativeCodeLoader   - Unable to
>>> load native-hadoop library for your platform... using builtin-java classes
>>> where applicable
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current
>>> Hadoop/Kerberos user: flink
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
>>> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
>>> heap size: 922 MiBytes
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
>>> /docker-java-home/jre
>>> 2018-12-19 13:09:39,318 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop
>>> version: 2.4.1
>>> 2018-12-19 13:09:39,318 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
>>> Options:
>>> 2018-12-19 13:09:39,319 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -XX:+UseG1GC
>>> 2018-12-19 13:09:39,319 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -XX:MaxDirectMemorySize=8388607T
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
>>> Arguments:
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> --configDir
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> /opt/flink/conf
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
>>> 

Re: After job cancel, leftover ZK state prevents job manager startup

2018-12-11 Thread Till Rohrmann
Hi Micah,

the problem looks indeed similar to FLINK-10255. Could you tell me your
exact cluster setup (HA with stand by JobManagers?). Moreover, the logs of
all JobManagers on DEBUG level would be helpful for further debugging.

Cheers,
Till

On Tue, Dec 11, 2018 at 10:09 AM Stefan Richter 
wrote:

> Hi,
>
> Thanks for reporting the problem, I think the exception trace looks indeed
> very similar to traces in the discussion for FLINK-10184. I will pull in
> Till who worked on the fix to hear his opinion. Maybe the current fix only
> made the problem less likely to appear but is not complete, yet?
>
> Best,
> Stefan
>
> > On 11. Dec 2018, at 05:19, Micah Wylde  wrote:
> >
> > Hello,
> >
> > We've been seeing an issue with several Flink 1.5.4 clusters that looks
> like this:
> >
> > 1. Job is cancelled with a savepoint
> > 2. The jar is deleted from our HA blobstore (S3)
> > 3. The jobgraph in ZK is *not* deleted
> > 4. We restart the cluster
> > 5. Startup fails in recovery because the jar is not available, with the
> stacktrace:
> >
> > 00:13:58.486 ERROR o.a.f.r.e.ClusterEntrypoint - Fatal error occurred in
> the cluster entrypoint.
> > {{ java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager}}
> > {{ at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)}}
> > {{ at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)}}
> > {{ at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)}}
> > {{ at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> > {{ at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}Caused
> by: java.lang.Exception: Cannot set up the user code libraries: No such
> file or directory:
> s3://streamingplatform-production/{JOB_NAME}/flink/highavailability/{JOB_NAME}/blob/job_5a3fe2c00c05efd3a552a1c6707d2c10/blob_p-6d585831f5c947335ac505b400cf8f3630cc706a-42355c2885b668b0bc5e15b856141b0
> >
> > This superficially seems similar to several issues that have apparently
> been fixed in 1.5.4, like FLINK-10255 and FLINK-10184.
> >
> > Has anybody else seen this issue on 1.5.4 (or later) clusters? Or any
> advice for debugging?
> >
> > Thanks,
> > Micah
>
>


Re: Question regarding rescale api

2018-12-10 Thread Till Rohrmann
Hi Mingliang,

Aljoscha is right. At the moment Flink does not support to spread out tasks
across all TaskManagers. This is a feature which we still need to add.
Until then, you need to set the parallelism to the number of available
slots in order to guarantee that all TaskManagers are equally used.

Cheers,
Till

On Mon, Dec 10, 2018 at 3:18 PM Aljoscha Krettek 
wrote:

> Hi,
>
> I think with how currently the assignment of tasks to slots works there is
> no way of ensuring that the source tasks are evenly spread to the
> TaskManagers (TaskExecutors). The rescale() API is from a time where
> scheduling worked a bit different in Flink, I'm afraid.
>
> I'm cc'ing Till, who might know more about scheduling.
>
> Best,
> Aljoscha
>
>
> On 10. Dec 2018, at 13:02, 祁明良  wrote:
>
>
> Hi Aljoscha,
>
> Seems you are the committer of rescale api, any help about this question?
>
> Best,
> Mingliang
>
> --
> *发件人:* 祁明良
> *发送时间:* 2018年12月9日 18:20
> *收件人:* user@flink.apache.org
> *主题:* Question regarding rescale api
>
> Hi All,
>
> I see the rescale api allow us to somehow redistribute element locally,
> but is it possible to make the upstream operator distributed evenly on task
> managers?
> For example I have 10 task managers each with 10 slots. The application
> reads data from Kafka topic with 20 partitions, then rescale it to full
> parallelism. To me it seems that the 20 slots needed to read from Kafka
> won’t distributed evenly on 10 task managers, which means further rescale
> still needs to shuffle data over network.
>
>
> Best,
> Mingliang
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>
>


[ANNOUNCE] Weekly community update #50

2018-12-10 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #50. Please post any news and
updates you want to share with the community to this thread.

# Unified core API for streaming and batch

The community started to discuss how to bring streaming and batch closer
together by implementing a common Operator abstraction on which both stream
and batch operators can run [1]. The discussion is still in its early stage
but you should subscribe to this thread if you want to stay up to date.

# Flink backward compatibility

Thomas started a while ago a discussion about Flink's backwards
compatibility which should not only include its APIs because Flink is used
by more and more third party applications [2]. As Stephan and Chesnay
mentioned, backwards compatibility should also be guaranteed for the client
APIs and data structures (e.g. job specification).

# Enhance convenience of TableEnvironment in Table API/SQL

Jincheng started a discussion on how to improve the TableEnvironment usage
from a user's perspective. At the moment the existing inheritance structure
can be confusing to users. He, thus, proposes to change this structure to
have more meaningful names for the user [3].

# Creating Flink 1.5.6

The community discussed whether to release a last bug fix release 1.5.6 for
the 1.5.x release branch [4]. So far the unanimous feedback is positive and
in favour of creating a last 1.5.6 release.

# Usage of Flink's Python API

The community started a survey of the usage of Flink's Python APIs [5].
Please join this discussion if you want to tell how you are using Flink's
Python APIs and how it could be improved.

[1]
https://lists.apache.org/thread.html/2746759af3c92091bb743cfe028c90777f8011a064bb95e65b1fb951@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/99059c90a0a1b59a4f18a5a0fdb73e17071b17bbb036649a48bb233b@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/b740feb190fd63db3d15bfe0399097d905ea49fad83ce9ccf4c070cd@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/348366080d6b87bf390efb98e5bf268620ab04a0451f8459e2f466cd@%3Cdev.flink.apache.org%3E

Cheers,
Till


[SURVEY] Usage of flink-python and flink-streaming-python

2018-12-07 Thread Till Rohrmann
Dear Flink community,

in order to better understand the needs of our users and to plan for the
future, I wanted to reach out to you and ask how much you use Flink's
Python API, namely flink-python and flink-streaming-python.

In order to gather feedback, I would like to ask all Python users to
respond to this thread and quickly outline how you use Python in
combination with Flink. Thanks a lot for your help!

Cheers,
Till


[ANNOUNCE] Weekly community update #49

2018-12-07 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #49. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.7.0 has been released

The community has release Flink 1.7.0 [1].

# Flink intro slide set

Fabian has refined the slide set for an intro talk about Apache Flink [2].

# External shuffle service

The discussion about making Flink's shuffle service pluggable is
accelerating [3]. Please chime in if you have suggestions.

# Making flink-table Scala-free

Timo started a design discussion on how to make flink-table less dependent
on Scala [4]. See the corresponding Flip [5] for more information.

# Refactor source interface

The discussion about Flink's new source interface is very lively. Don't
forget to follow the discussion if you don't want to be left behind [6].

# Call for presentations for Flink Forward San Francisco 2019 has been
extended

The call for presentation for Flink Forward San Francisco 2019 has been
extended to December 7, 11:59 PT. Even if there is not much time left, try
to submit your proposal!

# Support interactive programming in Flink Table API

The discussion about interactive programming with Flink's Table API has
been progressed [7]. Take a look to stay up to date.

# Flink SQL DDL design

The Flink SQL DDL design is converging and it should not take much longer
until the community can agree on a final design [8].

[1]
https://lists.apache.org/thread.html/845b5859dc3ad8f32a3af1e48dcc33fc2a4edcb09b9998cf55a6b20d@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/6a4a7381968ae76ce9ee93858885874965bda9b37c2803bfe4ab7505@%3Cdev.flink.apache.org%3E
[3]
https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
[4]
https://lists.apache.org/thread.html/c3b2f72520aff8e87b60030b7850c24ecbebf2e692c421d04543e8f9@%3Cdev.flink.apache.org%3E
[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free
[6]
https://lists.apache.org/thread.html/992274ff9f1519f1ef7c9400c91346bea34c53e1b7adc5918d8faa43@%3Cdev.flink.apache.org%3E
[7]
https://lists.apache.org/thread.html/8a93d331f69ed9aa2c30dbc7793a3e8803155aa08fdaec71681aa92a@%3Cdev.flink.apache.org%3E
[8]
https://lists.apache.org/thread.html/9e8eaa74b391ac21ca7268475e1179e7965ecd6389b8f5bbf9e7d6e2@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-06 Thread Till Rohrmann
Hi Hao,

if Flink tries to recover from a checkpoint, then the JobGraph should not
be modified and the system should be able to restore the state.

Have you changed the JobGraph and are you now trying to recover from the
latest checkpoint which is stored in ZooKeeper? If so, then you can also
start the job cluster with a different cluster id and manually pass the
path to the latest checkpoint as the savepoint path to resume from. By
specifying a new cluster id, the system will create a new ZNode in
ZooKeeper and don't use the checkpoints from the previous run.

If you did not change the JobGraph, then this sounds like a bug. For
further investigation the debug log files would be helpful.

Cheers,
Till

On Wed, Dec 5, 2018 at 7:18 PM Hao Sun  wrote:

> Till, Flink is automatically trying to recover from a checkpoint not
> savepoint. How can I get allowNonRestoredState applied in this case?
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann 
> wrote:
>
>> Hi Hao,
>>
>> I think you need to provide a savepoint file via --fromSavepoint to
>> resume from in order to specify --allowNonRestoredState. Otherwise this
>> option will be ignored because it only works if you resume from a savepoint.
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>>
>>> I am using 1.7 and job cluster on k8s.
>>>
>>> Here is how I start my job
>>> 
>>> docker-entrypoint.sh job-cluster -j
>>> com.zendesk.fraud_prevention.examples.ConnectedStreams
>>> --allowNonRestoredState
>>> 
>>>
>>> *Seems like --allowNonRestoredState is not honored*
>>>
>>> === Logs ===
>>> java","line":"1041","message":"Restoring job
>>>  from latest valid checkpoint: Checkpoint
>>> 8103 @ 0 for ."}
>>> {"timestamp":"2018-12-04
>>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
>>> error occurred in the cluster entrypoint."}
>>> java.lang.RuntimeException:
>>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>>> JobManager
>>> at
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>> http://JobManagerRunner.java:176)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>>> at
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> ... 7 more
>>> Caused by: java.lang.IllegalStateException: There is no operator for the
>>> state 2f4bc854a18755730e14a90e1d4d7c7d
>>> at
>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
>>> at
>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.(
>>> http://JobMaster.java:296)
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>> http://JobManagerRunner.java:157)
>>> ==
>>>
>>> Can somebody help out? Thanks
>>>
>>> Hao Sun
>>>
>>


Re: How to distribute subtasks evenly across taskmanagers?

2018-12-05 Thread Till Rohrmann
Hi Sunny,

this is a current limitation of Flink's scheduling. We are currently
working on extending Flinks scheduling mechanism [1] which should also help
with solving this problem. At the moment, I recommend using the per-job
mode so that you have a single cluster per job.

[1] https://issues.apache.org/jira/browse/FLINK-10407

Cheers,
Till

On Wed, Dec 5, 2018 at 2:07 AM Sunny Yun  wrote:

> Why does Flink do resource management by only slots, not by TaskManagers
> and slots?
>
> If there are one Flink cluster to submit multiple jobs, how do I make
> JobManager to distribute subtasks evenly to all TaskManagers?
> Now, JobManager treats the slots globally, some jobs' operators are
> assigned only one TM's slots.
>
>
> For example:
>
> 3 TaskManager (taskmanager.numberOfTaskSlots: 8) = total 24 slots
>
> env
> .setParallelism(6)
> .addSource(sourceFunction)
> .partitionCustom(partitioner, keySelector)
> .map(mapper)
> .addSink(sinkFunction);
> env.execute(job1);
>
> env
> .setParallelism(12)
> .addSource(sourceFunction)
> .partitionCustom(partitioner, keySelector)
> .map(mapper)
> .addSink(sinkFunction);
> env.execute(job2);
>
> env
> .setParallelism(6)
> .addSource(sourceFunction)
> .partitionCustom(partitioner, keySelector)
> .map(mapper)
> .addSink(sinkFunction);
> env.execute(job3);
>
>
> Intented :
> TM1 TM2 TM3
>--
> job1-source 2   2   2
> job1-map-sink   2   2   2
> job2-source 4   4   4
> job2-map-sink   4   4   4
> job3-source 2   2   2
> job3-map-sink   2   2   2
>
>
> Because each job is under the stress at unpredictable time, it is
> important to use all available resource per each job.
> We made three clusters (6, 6, 12 each total slots) as a temporary, but
> it's not pretty way.
>
>
> Best, Sunny
> ᐧ
>


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-05 Thread Till Rohrmann
Hi Hao,

I think you need to provide a savepoint file via --fromSavepoint to resume
from in order to specify --allowNonRestoredState. Otherwise this option
will be ignored because it only works if you resume from a savepoint.

Cheers,
Till

On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:

> I am using 1.7 and job cluster on k8s.
>
> Here is how I start my job
> 
> docker-entrypoint.sh job-cluster -j
> com.zendesk.fraud_prevention.examples.ConnectedStreams
> --allowNonRestoredState
> 
>
> *Seems like --allowNonRestoredState is not honored*
>
> === Logs ===
> java","line":"1041","message":"Restoring job
>  from latest valid checkpoint: Checkpoint
> 8103 @ 0 for ."}
> {"timestamp":"2018-12-04
> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
> error occurred in the cluster entrypoint."}
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more
> Caused by: java.lang.IllegalStateException: There is no operator for the
> state 2f4bc854a18755730e14a90e1d4d7c7d
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> ==
>
> Can somebody help out? Thanks
>
> Hao Sun
>


Re: long lived standalone job session cluster in kubernetes

2018-12-05 Thread Till Rohrmann
Hi Derek,

there is this issue [1] which tracks the active Kubernetes integration. Jin
Sun already started implementing some parts of it. There should also be
some PRs open for it. Please check them out.

[1] https://issues.apache.org/jira/browse/FLINK-9953

Cheers,
Till

On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee  wrote:

> Sounds good.
>
> Is someone working on this automation today?
>
> If not, although my time is tight, I may be able to work on a PR for
> getting us started down the path Kubernetes native cluster mode.
>
>
> On 12/4/18 5:35 AM, Till Rohrmann wrote:
>
> Hi Derek,
>
> what I would recommend to use is to trigger the cancel with savepoint
> command [1]. This will create a savepoint and terminate the job execution.
> Next you simply need to respawn the job cluster which you provide with the
> savepoint to resume from.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
>
> Cheers,
> Till
>
> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin 
> wrote:
>
>> Hi Derek,
>>
>> I think your automation steps look good.
>> Recreating deployments should not take long
>> and as you mention, this way you can avoid unpredictable old/new version
>> collisions.
>>
>> Best,
>> Andrey
>>
>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
>> wrote:
>> >
>> > Hi Derek,
>> >
>> > I am not an expert in kubernetes, so I will cc Till, who should be able
>> > to help you more.
>> >
>> > As for the automation for similar process I would recommend having a
>> > look at dA platform[1] which is built on top of kubernetes.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> > [1] https://data-artisans.com/platform-overview
>> >
>> > On 30/11/2018 02:10, Derek VerLee wrote:
>> >>
>> >> I'm looking at the job cluster mode, it looks great and I and
>> >> considering migrating our jobs off our "legacy" session cluster and
>> >> into Kubernetes.
>> >>
>> >> I do need to ask some questions because I haven't found a lot of
>> >> details in the documentation about how it works yet, and I gave up
>> >> following the the DI around in the code after a while.
>> >>
>> >> Let's say I have a deployment for the job "leader" in HA with ZK, and
>> >> another deployment for the taskmanagers.
>> >>
>> >> I want to upgrade the code or configuration and start from a
>> >> savepoint, in an automated way.
>> >>
>> >> Best I can figure, I can not just update the deployment resources in
>> >> kubernetes and allow the containers to restart in an arbitrary order.
>> >>
>> >> Instead, I expect sequencing is important, something along the lines
>> >> of this:
>> >>
>> >> 1. issue savepoint command on leader
>> >> 2. wait for savepoint
>> >> 3. destroy all leader and taskmanager containers
>> >> 4. deploy new leader, with savepoint url
>> >> 5. deploy new taskmanagers
>> >>
>> >>
>> >> For example, I imagine old taskmanagers (with an old version of my
>> >> job) attaching to the new leader and causing a problem.
>> >>
>> >> Does that sound right, or am I overthinking it?
>> >>
>> >> If not, has anyone tried implementing any automation for this yet?
>> >>
>> >
>>
>>


Re: Assigning a port range to rest.port

2018-12-05 Thread Till Rohrmann
Hi Gyula and Jeff,

I think at the moment it is not possible to define a port range for the
REST client. Maybe we should add something similar to the
RestOptions#BIND_ADDRESS, namely introducing a RestOptions#BIND_PORT which
can define a port range for the binding port. RestOptions#PORT will only be
considered on the client side to learn where to connect to. I've created a
corresponding JIRA issue to add this feature [1].

[1] https://issues.apache.org/jira/browse/FLINK-11081

Cheers,
Till

On Wed, Dec 5, 2018 at 1:21 PM Gyula Fóra  wrote:

> Maybe the problem is here? cc Till
>
>
> https://github.com/apache/flink/blob/44ed5ef0fc1c221f3916ab5126f1bc8ee5dfb45d/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L83
>
> https://github.com/apache/flink/blob/44ed5ef0fc1c221f3916ab5126f1bc8ee5dfb45d/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L101
>
>
>
>
> Jeff Zhang  ezt írta (időpont: 2018. dec. 5., Sze,
> 13:13):
>
>> This requirement makes sense to me. Another issue I hit due to single
>> value of rest port is that user can not start 2 local MiniCluster, I try to
>> start 2 flink scala-shell in local mode, but fails due to port conflict.
>>
>>
>>
>> Gyula Fóra  于2018年12月5日周三 下午8:04写道:
>>
>>> Hi!
>>> Is there any way currently to set a port range for the rest client?
>>> rest.port only takes a single number and it is anyways overwritten to 0.
>>>
>>> This seems to be necessary when running the flink client from behind a
>>> firewall where only a predefined port-range is accessible from the outside.
>>>
>>> I would assume this is a common setup in prod environments. This hasn't
>>> been a problem with the legacy execution mode.
>>>
>>> Any thoughts?
>>> Gyula
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: long lived standalone job session cluster in kubernetes

2018-12-04 Thread Till Rohrmann
Hi Derek,

what I would recommend to use is to trigger the cancel with savepoint
command [1]. This will create a savepoint and terminate the job execution.
Next you simply need to respawn the job cluster which you provide with the
savepoint to resume from.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint

Cheers,
Till

On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin 
wrote:

> Hi Derek,
>
> I think your automation steps look good.
> Recreating deployments should not take long
> and as you mention, this way you can avoid unpredictable old/new version
> collisions.
>
> Best,
> Andrey
>
> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
> wrote:
> >
> > Hi Derek,
> >
> > I am not an expert in kubernetes, so I will cc Till, who should be able
> > to help you more.
> >
> > As for the automation for similar process I would recommend having a
> > look at dA platform[1] which is built on top of kubernetes.
> >
> > Best,
> >
> > Dawid
> >
> > [1] https://data-artisans.com/platform-overview
> >
> > On 30/11/2018 02:10, Derek VerLee wrote:
> >>
> >> I'm looking at the job cluster mode, it looks great and I and
> >> considering migrating our jobs off our "legacy" session cluster and
> >> into Kubernetes.
> >>
> >> I do need to ask some questions because I haven't found a lot of
> >> details in the documentation about how it works yet, and I gave up
> >> following the the DI around in the code after a while.
> >>
> >> Let's say I have a deployment for the job "leader" in HA with ZK, and
> >> another deployment for the taskmanagers.
> >>
> >> I want to upgrade the code or configuration and start from a
> >> savepoint, in an automated way.
> >>
> >> Best I can figure, I can not just update the deployment resources in
> >> kubernetes and allow the containers to restart in an arbitrary order.
> >>
> >> Instead, I expect sequencing is important, something along the lines
> >> of this:
> >>
> >> 1. issue savepoint command on leader
> >> 2. wait for savepoint
> >> 3. destroy all leader and taskmanager containers
> >> 4. deploy new leader, with savepoint url
> >> 5. deploy new taskmanagers
> >>
> >>
> >> For example, I imagine old taskmanagers (with an old version of my
> >> job) attaching to the new leader and causing a problem.
> >>
> >> Does that sound right, or am I overthinking it?
> >>
> >> If not, has anyone tried implementing any automation for this yet?
> >>
> >
>
>


Re: Weird behavior in actorSystem shutdown in akka

2018-12-03 Thread Till Rohrmann
Hi Joshua,

sorry for getting back to you so late. Personally, I haven't seen this
problem before. Without more log context I think I won't be able to help
you. This looks a bit more like an Akka problem than a Flink problem to be
honest.

One cause could be that akka.remote.flush-wait-on-shutdown is set too low.
But this should only happen when you shut down the remote ActorSystem (JM
ActorSystem) and you should see "Shutdown finished, but flushing might not
have been successful and some messages might have been dropped. Increase
akka.remote.flush-wait-on-shutdown to a larger value to avoid this." in the
logs.

I don't know which two ports you are referring to for the agent. I think it
would help to share also the logs of your agent.

Cheers,
Till

On Mon, Nov 19, 2018 at 1:49 PM Joshua Fan  wrote:

> Hi, Till and users,
>
> There is a weird behavior in actorSystem shutdown in akka of our flink
> platform.
> We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing
> agent to submit flink job to yarn which is based on YarnClient. User can
> connect to the agent to submit job and disconnect, but the agent is always
> there. So, each time the user submit a job there would be a ActorSystem
> created, after the job submitted in detached mode successfully, the
> ActorSystem would be shutdown.
> The weird thing is that there always an akka error message turn out in jm
> log after 2 days( 2 day is the default value in akka of
> quarantine-after-silence), like below.
>
> 2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 -
> Association to [akka.tcp://fl...@client01v.xxx:35767] with UID
> [-1757115446] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> In the above, the client01v*** is the host node where runs the agent, and
> the above error turns out randomly. We trigger a savepoint in the agent
> every half hour, it means the actorSystem will be created and shutdown
> accordingly. But only 1 of 50 chance  the shutdown will raise a error like
> above.
>
> I think maybe it refer to the akka system. I checked the akka code, found
> some clues as below.
> for those there is no error raised in two days, the log in jm like this:
>
> 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-23 - Association between local
> [tcp://flink@:29448] and remote [tcp://flink@:56906] was
> disassociated because the ProtocolStateActor failed: Shutdown
> 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-23 - Association between local
> [tcp://flink@:29448] and remote [tcp://flink@:56906] was
> disassociated because the ProtocolStateActor failed: Shutdown
> 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
> Remote system with address [akka.tcp://flink@:41769] has shut down.
> Address is now gated for 5000 ms, all messages to this address will be
> delivered to dead letters.
> 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
> Remote system with address [akka.tcp://flink@:41769] has shut down.
> Address is now gated for 5000 ms, all messages to this address will be
> delivered to dead letters.
>
> It seems the remote actor receives the shutdown proposal, the akka message
> may flow like below:
> 1.The agent shut down the actorSystem
> 2.The EndpointReader in jm  receives an AssociationHandle. Shutdown  and
> EndpointReader just throws it as a ShutDownAssociation, and the
> EndpointWriter will publishAndthrow the ShutDownAssociation again.
> 2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem
> reported by the EndpointWriter, it also throw it out.
> 3.when the 

Re: Custom scheduler in Flink

2018-11-30 Thread Till Rohrmann
Hi Felipe,

https://issues.apache.org/jira/browse/FLINK-10429 might also be
interesting. The community is currently working on making the Scheduler
pluggable to make it easier to extend this component.

Cheers,
Till

On Wed, Nov 28, 2018 at 2:56 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Thanks, I'll check it out.
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Nov 28, 2018 at 2:44 PM Chesnay Schepler 
> wrote:
>
> > There's no *reasonable *way to implement a custom Scheduler, i.e.,
> > something where can just plug in your scheduler in a nice way. For this
> > you'll have to directly modify the source of Flink.
> >
> > The work in https://issues.apache.org/jira/browse/FLINK-8886 may also be
> > of interest, but is still in the design phase.
> >
> > On 28.11.2018 10:21, Felipe Gutierrez wrote:
> >
> > Hi,
> >
> > I want to develop a custom scheduler in Flink to be aware for which host
> > Flink must process some task. This post shows (using Apache Storm) the
> kind
> > of example I want to build (
> https://inside.edited.com/taking-control-of-your-apache-storm-cluster-with-tag-aware-scheduling-b605e37e
> > ).
> >
> > I saw some related question on this post from 2016 (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/custom-scheduler-in-Flink-td7804.html
> )
> > and I wonder if there is something more actual or better explained about
> a
> > custom scheduler.
> >
> > Thanks,
> > Felipe
> > *--*
> > *-- Felipe Gutierrez*
> >
> > *-- skype: felipe.o.gutierrez*
> > *--* *https://felipeogutierrez.blogspot.com<
> https://felipeogutierrez.blogspot.com> <
> https://felipeogutierrez.blogspot.com>*
> >
> >
> >
> >
>


Re: Apache Flink 1.7.0 jar complete ?

2018-11-30 Thread Till Rohrmann
Hi Arnaud,

I tried to setup the same testing project as you've described and it worked
for me. Could you maybe try to clear your Maven repository? Maybe not all
dependencies had been properly mirrored to Maven central.

Cheers,
Till

On Fri, Nov 30, 2018 at 2:31 PM Till Rohrmann  wrote:

> Thanks for reporting this problem Arnaud. I will investigate this problem.
>
> Cheers,
> Till
>
> On Fri, Nov 30, 2018 at 12:20 PM LINZ, Arnaud 
> wrote:
>
>> Hi,
>>
>>
>>
>> When trying to update to 1.7.0, a simple local cluster test fails with :
>>
>>
>>
>> 12:03:55.182 [main] DEBUG o.a.f.s.a.graph.StreamGraphGenerator -
>> Transforming SinkTransformation{id=2, name='Print to Std. Out',
>> outputType=GenericType, parallelism=1}
>>
>> 12:03:55.182 [main] DEBUG o.a.f.s.a.graph.StreamGraphGenerator -
>> Transforming SourceTransformation{id=1, name='Custom Source',
>> outputType=String, parallelism=1}
>>
>> 12:03:55.182 [main] DEBUG o.a.f.s.api.graph.StreamGraph - Vertex: 1
>>
>> 12:03:55.182 [main] DEBUG o.a.f.s.api.graph.StreamGraph - Vertex: 2
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/flink/shaded/guava18/com/google/common/hash/Hashing
>> at
>> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:80)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:93)
>> at
>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:669)
>> at
>> org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:92)
>> at flink.flink_10832.App.testFlink10832(App.java:60)
>> at flink.flink_10832.App.main(App.java:31)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.shaded.guava18.com.google.common.hash.Hashing
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 8 more
>>
>>
>>
>> Pom. is :
>>
>> http://maven.apache.org/POM/4.0.0
>> <http://maven.apache.org/POM/4.0.0>"*
>>
>> xmlns:xsi=*"http://www.w3.org/2001/XMLSchema-instance
>> <http://www.w3.org/2001/XMLSchema-instance>"*
>>
>> xsi:schemaLocation=*"http://maven.apache.org/POM/4.0.0
>> <http://maven.apache.org/POM/4.0.0>
>> http://maven.apache.org/xsd/maven-4.0.0.xsd
>> <http://maven.apache.org/xsd/maven-4.0.0.xsd>"*>
>>
>> 4.0.0
>>
>>
>>
>> flink
>>
>> flink-10832
>>
>> 0.0.1-SNAPSHOT
>>
>> jar
>>
>> flink-10832
>>
>>
>>
>> 
>>
>> UTF-8> >
>>
>> 1.7.0
>>
>> 
>>
>>
>>
>> 
>>
>> 
>>
>> 
>>
>> org.apache.flink
>>
>> flink-java
>>
>> ${flink.version}
>>
>> 
>>
>> 
>>
>> org.apache.flink
>>
>> flink-streaming-java_2.11
>>
>> ${flink.version}
>>
>> 
>>
>>
>>
>>
>>
>> 
>>
>> ch.qos.logback
>>
>> logback-classic
>>
>> 1.0.13
>>
>> 
>>
>> 
>>
>> org.slf4j
>>
>> slf4j-api
>>
>> 1.7.5
>>
>> 
>>
>>
>>
>> 
>>
>> 
>>
>>
>>
>> Code is :
>>
>> *public* *static* *void* testFlink10832() *throws* Exception {
>>
>> // get the execution environment
>>
>> *final* StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.*getExecutionEnvironment*();
>>
>> // get

Re: Apache Flink 1.7.0 jar complete ?

2018-11-30 Thread Till Rohrmann
Thanks for reporting this problem Arnaud. I will investigate this problem.

Cheers,
Till

On Fri, Nov 30, 2018 at 12:20 PM LINZ, Arnaud 
wrote:

> Hi,
>
>
>
> When trying to update to 1.7.0, a simple local cluster test fails with :
>
>
>
> 12:03:55.182 [main] DEBUG o.a.f.s.a.graph.StreamGraphGenerator -
> Transforming SinkTransformation{id=2, name='Print to Std. Out',
> outputType=GenericType, parallelism=1}
>
> 12:03:55.182 [main] DEBUG o.a.f.s.a.graph.StreamGraphGenerator -
> Transforming SourceTransformation{id=1, name='Custom Source',
> outputType=String, parallelism=1}
>
> 12:03:55.182 [main] DEBUG o.a.f.s.api.graph.StreamGraph - Vertex: 1
>
> 12:03:55.182 [main] DEBUG o.a.f.s.api.graph.StreamGraph - Vertex: 2
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/shaded/guava18/com/google/common/hash/Hashing
> at
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:80)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:93)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:669)
> at
> org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:92)
> at flink.flink_10832.App.testFlink10832(App.java:60)
> at flink.flink_10832.App.main(App.java:31)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.shaded.guava18.com.google.common.hash.Hashing
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 8 more
>
>
>
> Pom. is :
>
> http://maven.apache.org/POM/4.0.0
> <http://maven.apache.org/POM/4.0.0>"*
>
> xmlns:xsi=*"http://www.w3.org/2001/XMLSchema-instance
> <http://www.w3.org/2001/XMLSchema-instance>"*
>
> xsi:schemaLocation=*"http://maven.apache.org/POM/4.0.0
> <http://maven.apache.org/POM/4.0.0>
> http://maven.apache.org/xsd/maven-4.0.0.xsd
> <http://maven.apache.org/xsd/maven-4.0.0.xsd>"*>
>
> 4.0.0
>
>
>
> flink
>
> flink-10832
>
> 0.0.1-SNAPSHOT
>
> jar
>
> flink-10832
>
>
>
> 
>
> UTF-8
>
> 1.7.0
>
> 
>
>
>
> 
>
> 
>
> 
>
> org.apache.flink
>
> flink-java
>
> ${flink.version}
>
> 
>
> 
>
> org.apache.flink
>
> flink-streaming-java_2.11
>
> ${flink.version}
>
> 
>
>
>
>
>
> 
>
> ch.qos.logback
>
> logback-classic
>
> 1.0.13
>
> 
>
> 
>
> org.slf4j
>
> slf4j-api
>
> 1.7.5
>
> 
>
>
>
> 
>
> 
>
>
>
> Code is :
>
> *public* *static* *void* testFlink10832() *throws* Exception {
>
> // get the execution environment
>
> *final* StreamExecutionEnvironment env =
> StreamExecutionEnvironment.*getExecutionEnvironment*();
>
> // get input data by connecting to the socket
>
> @SuppressWarnings("serial")
>
> *final* DataStreamSource text = env.addSource(*new*
> SourceFunction() {
>
> @Override
>
> *public* *void* run(*final* SourceContext ctx)
> *throws* Exception {
>
> *for* (*int* count = 0; count < 5; count++) {
>
> ctx.collect(String.*valueOf*(count));
>
> }
>
> }
>
>
>
> @Override
>
> *public* *void* *cancel*() {
>
> }
>
> });
>
> text.print().setParallelism(1);
>
> env.execute("Simple Test");
>
> *System.**out*.println("If you see this the issue is resolved!");
>
> }
>
>
>
> Any idea why ?
>
>
>
> Regards,
>
> Arnaud

[ANNOUNCE] Apache Flink 1.7.0 released

2018-11-30 Thread Till Rohrmann
The Apache Flink community is very happy to announce the release of Apache
Flink 1.7.0, which is the next major release.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the new features
and improvements for this release:
https://flink.apache.org/news/2018/11/30/release-1.7.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343585

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Till


Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
I can see the benefit for other users as well. One could include it as part
of some development/debugging tools, for example. It would not strictly
need to go into Flink but it would have the benefit of better/increased
visibility I guess. In that sense, opening a JIRA issue and posting on dev
might be a good idea to check how much interest is there.

Cheers,
Till

On Thu, Nov 22, 2018 at 5:17 PM Scott Sue  wrote:

> Hi Till,
>
> Yeah I think that would work especially knowing this isn’ something that
> is out of the box at the moment.  Do you think its worth raising this as a
> feature request at all?  I think that’s one thing with my experience with
> Flink is that its quite hard to debug what is going on when there is an
> unexpected exception.
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 23 Nov 2018, at 00:12, Till Rohrmann  wrote:
>
> Hi Scott,
>
> I think you could write some Wrappers for the different user function
> types which could contain the logging logic. That way you would still need
> to wrap you actual business logic but don't have to duplicate the logic
> over and over again.
>
> If you also want to log the state, then you would need to wrap the
> RuntimeContext to interfere all state registering calls so that you can
> keep track of them.
>
> Would that work for you?
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue 
> wrote:
>
>> Yeah I think that would work for incorrect data consumed, but not for if
>> deserialization passes correctly, but one of my custom functions
>> post deserialization generates an error?
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>>
>> If so , then you can implement your own deserializer[1] with costume
>> logic  and error handling
>>
>>
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>
>>
>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
>> wrote:
>>
>>> Json is sent into Kafka
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603
>>> Mobile : +852 9611 3969
>>>
>>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>>> www.celer-tech.com
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>>
>>> Which data format   is sent to kafka ?
>>> Json Avro Other ?
>>>
>>>
>>>
>>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>>> wrote:
>>>
>>>> Unexpected data meaning business level data that I didn’t expect to
>>>> receive. So business level data that doesn’t quite conform
>>>>
>>>> On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:
>>>>
>>>>>  Unexpected data you mean parsing error ?
>>>>> Which format is sent to Kafka ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 22 Nov 2018, 6:59 Scott Sue >>>>
>>>>>> Hi all,
>>>>>>
>>>>>> When I'm running my jobs I am consuming data from Kafka to process in
>>>>>> my
>>>>>> job.  Unfortunately my job receives unexpected data from time to time
>>>>>> which
>>>>>> I'm trying to find the root cause of the issue.
>>>>>>
>>>>>> Ideally, I want to be able to have a way to know when the job has
>>>>>> failed due
>>>>>> to an exception, to then log to file the last message that it was
>>>>>> consuming
>>>>>> at the time to help track down the offending message consumed.  How
>>>>>> is this
>>>>>> possible within Flink?
>>>>>>
>>>>>> Thinking about this more, it may not be a consumed message that
>>>>>> killed the
>>>>>> job

Re: Flink restart strategy on specific exception

2018-11-22 Thread Till Rohrmann
Hi Kasif,

I think in this situation it is best if you defined your own custom
RestartStrategy by specifying a class which has a `RestartStrategyFactory
createFactory(Configuration configuration)` method as `restart-strategy:
MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.

Cheers,
Till

On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:

> Hello,
>
>
>
> Looking at existing restart strategies they are kind of generic. We have a
> requirement to restart the job only in case of specific exception/issues.
>
> What would be the best way to have a re start strategy which is based on
> few rules like looking at particular type of exception or some extra
> condition checks which are application specific.?
>
>
>
> Just a background on one specific issue which invoked this requirement is
> slots not getting released when the job finishes. In our applications, we
> keep track of jobs submitted with the amount of parallelism allotted to
> it.  Once the job finishes we assume that the slots are free and try to
> submit next set of jobs which at times fail with error  “not enough slots
> available”.
>
>
>
> So we think a job re start can solve this issue but we only want to re
> start only if this particular situation is encountered.
>
>
>
> Please let us know If there are better ways to solve this problem other
> than re start strategy.
>
>
>
> Thanks,
>
> Kasif
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
Hi Scott,

I think you could write some Wrappers for the different user function types
which could contain the logging logic. That way you would still need to
wrap you actual business logic but don't have to duplicate the logic over
and over again.

If you also want to log the state, then you would need to wrap the
RuntimeContext to interfere all state registering calls so that you can
keep track of them.

Would that work for you?

Cheers,
Till

On Thu, Nov 22, 2018 at 8:44 AM Scott Sue  wrote:

> Yeah I think that would work for incorrect data consumed, but not for if
> deserialization passes correctly, but one of my custom functions
> post deserialization generates an error?
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>
> If so , then you can implement your own deserializer[1] with costume
> logic  and error handling
>
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>
>
> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
> wrote:
>
>> Json is sent into Kafka
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>
>> Which data format   is sent to kafka ?
>> Json Avro Other ?
>>
>>
>>
>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>> wrote:
>>
>>> Unexpected data meaning business level data that I didn’t expect to
>>> receive. So business level data that doesn’t quite conform
>>>
>>> On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:
>>>
  Unexpected data you mean parsing error ?
 Which format is sent to Kafka ?



 On Thu, 22 Nov 2018, 6:59 Scott Sue >>>
> Hi all,
>
> When I'm running my jobs I am consuming data from Kafka to process in
> my
> job.  Unfortunately my job receives unexpected data from time to time
> which
> I'm trying to find the root cause of the issue.
>
> Ideally, I want to be able to have a way to know when the job has
> failed due
> to an exception, to then log to file the last message that it was
> consuming
> at the time to help track down the offending message consumed.  How is
> this
> possible within Flink?
>
> Thinking about this more, it may not be a consumed message that killed
> the
> job, but maybe a transformation within the job itself and it died in a
> downstream Operator.  In this case, is there a way to log to file the
> message that an Operator was processing at the time that caused the
> exception?
>
>
> Thanks in advance!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
 --
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
>>> Mobile : +852 9611 3969 <9611%203969>
>>>
>>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>>> www.celer-tech.com
>>>
>>> *This message, including any attachments, may include private,
>>> privileged and confidential information and is intended only for the
>>> personal and confidential use of the intended recipient(s). If the reader
>>> of this message is not an intended recipient, you are hereby notified that
>>> any review, use, dissemination, distribution, printing or copying of this
>>> message or its contents is strictly prohibited and may be unlawful. If you
>>> are not an intended recipient or have received this communication in error,
>>> please immediately notify the sender by telephone and/or a reply email and
>>> permanently delete the original message, including any attachments, without
>>> making a copy.*
>>>
>>
>>
>> *This message, including any attachments, may include private, privileged
>> and confidential information and is intended only for the personal and
>> confidential use of the intended recipient(s). If the reader of this
>> message is not an intended recipient, you are hereby notified that any
>> review, use, dissemination, distribution, printing or copying of this
>> message or its contents is strictly prohibited and may be unlawful. If you
>> are not an intended recipient or have received this communication in error,
>> please immediately notify the sender by telephone and/or a reply email and
>> permanently delete the original message, including any attachments, without
>> making a copy.*
>>
>
>
> *This message, including any attachments, may include private, privileged
> and confidential information and is intended only for the personal and
> confidential use of the intended recipient(s). 

Re: elasticsearch sink can't connect to elastic cluster with BasicAuth

2018-11-22 Thread Till Rohrmann
Hi,

I think you need to a custom `RestClientFactory` which enables basic auth
on the ElasticSearch RestClient according to this documentation [1]. You
can set the RestClientFactory on the ElasticsearchSink.Builder.

[1]
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html

Cheers,
Till

On Thu, Nov 22, 2018 at 9:36 AM hzyuemeng1 
wrote:

>
> after i install x-pack in my elasticsearch cluster and the elasticsearch
> cluster with basicauth
> the elasticsearch sink can't connect to elastic cluster
>
> code like:
>
> DataStream> esSink27 =
> tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);
> //generate user config map
> java.util.Map userConfigMap22 =
> com.google.common.collect.Maps.newHashMap();
> userConfigMap22.put("cluster.name", "test-magina");
> userConfigMap22.put("bulk.flush.max.actions", "1");
> //userConfigMap22.put("shield.user", "elastic:magina1001password");
>
> //generate transports list
> Splitter commaSplitter24 = Splitter.on(",");
> Splitter colonSplitter25 = Splitter.on(":");
> List transportsList23 = Lists.newArrayList();
> for (String transport : commaSplitter24.split("101.206.91.118:9300")) {
> List ipAndPort = colonSplitter25.splitToList(transport);
> transportsList23.add(new
> InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)),
> Integer.parseInt(ipAndPort.get(1;
> }
> esSink27.addSink(new ElasticsearchSink Row>>(userConfigMap22, transportsList23, new
> MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid",
> "test-au", "test-au", "action,num"), new
> RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);
>
>
>-
>
>Any help will be greatly appreciated
>
>
> hzyuemeng1
> hzyueme...@corp.netease.com
>
> 
> 签名由 网易邮箱大师  定制
>


Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Till Rohrmann
Hi Akshay,

Flink currently does not support to automatically distribute hot keys
across different JVMs. What you can do is to adapt the parallelism/number
of partitions manually if you encounter that one partition contains a lot
of hot keys. This might mitigate the problem by partitioning the hot keys
into different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your
join result is quite large. One record is 1 GB large. Try to decrease it or
give more memory to your TMs.

Cheers,
Till

On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
wrote:

> Hi Zhijiang,
>  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
>  and spark
>  seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
> wrote:
>
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>>
>> --
>> 发件人:Akshay Mendole 
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user 
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>> We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>> at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>> at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>> at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>> at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>> at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>> at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>> at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>> at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>> at java.io.DataOutputStream.write(DataOutputStream.java:107)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>  

Re: DataSet - Broadcast set in output format

2018-11-22 Thread Till Rohrmann
Hi Bastien,

the OutputFormat specifies how a given record is written to an external
system. The DataSink using these formats do not support using broadcast
variables. This is currently a limitation of Flink.

What you could do is to introduce a mapper before your sink which enriches
the records with respect to the broadcast variable. The OutputFormat could
then react to this additional information.

Cheers,
Till

On Thu, Nov 22, 2018 at 2:57 PM bastien dine  wrote:

> Hello,
>
> I would like to use a broadcast variable in my outputformat (to pass some
> information, and control execution flow)
> How would I do it ?
> .output does not have a .withBroadcast function as it does not extends
> SingleInputUdfOperator
>
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: Passing application configuring to Flink uber jar

2018-11-22 Thread Till Rohrmann
Hi Krishna,

I think the problem is that you are trying to pass in dynamic properties
(-Dconfig.file=dev.conf) to an already started the cluster. The Flink
cluster components or their JVMs need to know the env.java.opts at cluster
start up time and not when the Flink job is submitted. You can check this
by looking into the logs. They contain the parameters with which the JVMs
are started.

You could, however, use the per job mode where you start a cluster per job
(-m yarn-cluster).

Cheers,
Till

On Thu, Nov 22, 2018 at 3:27 PM Krishna Kalyan 
wrote:

> Hello All,
>
> I have a Flink application that inherits configuration from
> application.conf in the resources folder.
>
> Now, I want to run this application on a cluster. I begin creating an fat
> jar "mvn clean assembly".
>
> This jar file is executed by executing the following command below
>
> $FLINK_HOME/bin/flink run --class "com.test" -D 
> "env.java.opts=-Dconfig.file=dev.conf" 
> /opt/resources/iot-flink-assembly-0.1.0.jar
>
>
> This causes and error that the configuration key was not found. I could
> someone please let me know how to pass custom application.conf environment
> file to cluster. I could not find a lot of information online on how to do
> this.
>
> Regards,
> Krishna
>
> Reference:
> [1]
> https://stackoverflow.com/questions/46157479/apache-flink-how-do-i-specify-an-external-application-conf-file?rq=1
> [2] https://github.com/lightbend/config/issues/84
>
>
>
> --
>
> Krishna Kalyan
>
> M +49 151 44159906 <+49%20151%2044159906>
>
> Standorte in Stuttgart und Berlin  · Zoi
> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
> E-Mail zu. This message (including any attachments) contains confidential
> information intended for a specific individual and purpose, and is
> protected by law. If you are not the intended recipient, you should delete
> this message. Any disclosure, copying, or distribution of this message, or
> the taking of any action based on it, is strictly prohibited.
>
>


[ANNOUNCE] Weekly community update #47

2018-11-20 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #47. Please post any news and
updates you want to share with the community to this thread.

# Updates on sharing state between subtasks

Jamie opened a first PR to add a first version of sharing state between
tasks. It works by using the JobMaster as the point of synchronization [1].

# Refactor source interface

There is a lively discussion about refactoring Flink's source interface to
make it future proof [2]. Join the discussion if you want to learn more.

# Task speculative execution for batch jobs

Tao Yangyu started a discussion about executing batch tasks speculatively
in order to mitigate the straggler problem [3]. If you have good ideas for
this problem, then please chime in.

# 2nd release candidate for Flink 1.7.0

The community just published the second release candidate for Flink 1.7.0
[4]. Please help the community by testing the latest release candidate and
report any encountered problems. Thanks a lot!

# Embracing Table API in Flink ML

Weihua kicked off a discussion about how to build Flink's next Machine
Learning pipelines [5]. He drafted a design document with his proposal.
Join the discussion to learn more about and share your opinion.

# Support for interactive programming in Flink Table API

Jiangjie started a discussion about interactive programming with Flink's
Table API [6]. The idea is to make results of previous jobs accessible to
successive Flink jobs to better support a REPL like job execution.

[1]
https://lists.apache.org/thread.html/b6eca694eaf7ee19386f4fb407098ae4b58df788b539e0666e28c37c@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/54514523379d3768159312a6f25071547e7e63f3b9bf9e19eb4f3937@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/10913c242452d018840ba541d29323314732e4498777f77b002e30ad@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/cf83aea5bb5ff7a719fe4dc082325469969e5cfc49786646ffc0c8f2@%3Cdev.flink.apache.org%3E
[6]
https://lists.apache.org/thread.html/8a93d331f69ed9aa2c30dbc7793a3e8803155aa08fdaec71681aa92a@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-13 Thread Till Rohrmann
Good to hear Cliff.

You're right that it's not a nice user experience. The problem with
queryable state is that one would need to take a look at the actual user
job to decide whether the user uses queryable state or not. But then it's
already too late for starting the respective infrastructure needed for
querying the state. You're right, though, that we should at least take a
random port per default. I've created a corresponding issue for this:
https://issues.apache.org/jira/browse/FLINK-10866.

Cheers,
Till

On Mon, Nov 12, 2018 at 11:16 PM Cliff Resnick  wrote:

> Hi Till,
>
> Yes, it turns out the problem was
> having flink-queryable-state-runtime_2.11-1.6.2.jar in flink/lib. I guess
> Queriable State bootstraps itself and, in my situation, it brought the task
> manager down when it found no available ports. What's a little troubling is
> that I had not configured Queriable State at all, so I would not expect it
> to get in the way. I haven't looked further into it but I think that if
> Queriable State wants to enable itself then it should at worst take an
> unused port by default, especially since many folks will be running in
> shared environments like YARN.
>
> But anyway, thanks for that! I'm now up with 1.6.2.
>
> Cliff
>
> On Mon, Nov 12, 2018 at 6:04 AM Till Rohrmann 
> wrote:
>
>> Hi Cliff,
>>
>> the TaskManger fail to start with exit code 31 which indicates an
>> initialization error on startup. If you check the TaskManager logs via
>> `yarn logs -applicationId ` you should see the problem why the TMs
>> don't start up.
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 9, 2018 at 8:32 PM Cliff Resnick  wrote:
>>
>>> Hi Till,
>>>
>>> Here are Job Manager logs, same job in both 1.6.0 and 1.6.2 at DEBUG
>>> level. I saw several errors in 1.6.2, hope it's informative!
>>>
>>> Cliff
>>>
>>> On Fri, Nov 9, 2018 at 8:34 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Cliff,
>>>>
>>>> this sounds not right. Could you share the logs of the Yarn cluster
>>>> entrypoint with the community for further debugging? Ideally on DEBUG
>>>> level. The Yarn logs would also be helpful to fully understand the problem.
>>>> Thanks a lot!
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Nov 8, 2018 at 9:59 PM Cliff Resnick  wrote:
>>>>
>>>>> I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
>>>>> configuration of 3 slots per TM. The cluster is dedicated to a single job
>>>>> that runs at full capacity in "FLIP6" mode. So in this cluster, the
>>>>> parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).
>>>>>
>>>>> When I run the job in 1.6.0, seven Task Managers are spun up as
>>>>> expected. But if I run with 1.6.2 only four Task Managers spin up and the
>>>>> job hangs waiting for more resources.
>>>>>
>>>>> Our Flink distribution is set up by script after building from source.
>>>>> So aside from flink jars, both 1.6.0 and 1.6.2 directories are identical.
>>>>> The job is the same, restarting from savepoint. The problem is repeatable.
>>>>>
>>>>> Has something changed in 1.6.2, and if so can it be remedied with a
>>>>> config change?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread Till Rohrmann
a.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
>>> at
>>> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Could you send us a small example program which we can use to reproduce
>>>> the problem?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta 
>>>> wrote:
>>>>
>>>>> Yeah, it IS using Kryo serializer.
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Jayant, could you check that the UUID key on the TM is actually
>>>>>> serialized using a Kryo serializer? You can do this by setting a 
>>>>>> breakpoint
>>>>>> in the constructor of the `AbstractKeyedStateBackend`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>>>>>>
>>>>>>> Hi, Jayant
>>>>>>>
>>>>>>> Your code looks good to me. And I’ve tried the
>>>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay.
>>>>>>>
>>>>>>> I’m not very sure about this problem. Maybe you can write a very
>>>>>>> simple demo to try if it works.
>>>>>>>
>>>>>>>
>>>>>>> Jiayi Liao, Best
>>>>>>>
>>>>>>>  Original Message
>>>>>>> *Sender:* Jayant Ameta
>>>>>>> *Recipient:* bupt_ljy
>>>>>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>>>>>> user@flink.apache.org>
>>>>>>> *Date:* Monday, Oct 29, 2018 11:53
>>>>>>> *Subject:* Re: Queryable state when ke

Re: Implementation error: Unhandled exception - "Implementation error: Unhandled exception."

2018-11-13 Thread Till Rohrmann
Hi Richard,

could you share with us the complete logs to better debug the problem. What
do you mean exactly with upgrading your job? Cancel with savepoint and then
resuming the new job from the savepoint? Thanks a lot.

Cheers,
Till

On Mon, Nov 12, 2018 at 5:08 PM Timo Walther  wrote:

> Hi Richard,
>
> this sounds like a bug to me. I will loop in Till (in CC) who might know
> more about this.
>
> Regards,
> Timo
>
>
> Am 07.11.18 um 20:35 schrieb Richard Deurwaarder:
>
> Hello,
>
> We have a flink job / cluster running in kubernetes. Flink 1.6.2 (but the
> same happens in 1.6.0 and 1.6.1) To upgrade our job we use the REST API.
>
> Every so often the jobmanager seems to be stuck in a crashing state and
> the logs show me this stack trace:
>
> 2018-11-07 18:43:05,815 [flink-scheduler-1] ERROR
> org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler -
> Implementation error: Unhandled exception.
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#1016927511]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.Implementation error: Unhandled
> exception.".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
>
> If I restart the jobmanager everything is fine afterwards, but the
> jobmanager will not restart by itself.
>
> What might've caused this and is this something we can prevent?
>
> Richard
>
>
>


Re: Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-12 Thread Till Rohrmann
Hi Cliff,

the TaskManger fail to start with exit code 31 which indicates an
initialization error on startup. If you check the TaskManager logs via
`yarn logs -applicationId ` you should see the problem why the TMs
don't start up.

Cheers,
Till

On Fri, Nov 9, 2018 at 8:32 PM Cliff Resnick  wrote:

> Hi Till,
>
> Here are Job Manager logs, same job in both 1.6.0 and 1.6.2 at DEBUG
> level. I saw several errors in 1.6.2, hope it's informative!
>
> Cliff
>
> On Fri, Nov 9, 2018 at 8:34 AM Till Rohrmann  wrote:
>
>> Hi Cliff,
>>
>> this sounds not right. Could you share the logs of the Yarn cluster
>> entrypoint with the community for further debugging? Ideally on DEBUG
>> level. The Yarn logs would also be helpful to fully understand the problem.
>> Thanks a lot!
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 8, 2018 at 9:59 PM Cliff Resnick  wrote:
>>
>>> I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
>>> configuration of 3 slots per TM. The cluster is dedicated to a single job
>>> that runs at full capacity in "FLIP6" mode. So in this cluster, the
>>> parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).
>>>
>>> When I run the job in 1.6.0, seven Task Managers are spun up as
>>> expected. But if I run with 1.6.2 only four Task Managers spin up and the
>>> job hangs waiting for more resources.
>>>
>>> Our Flink distribution is set up by script after building from source.
>>> So aside from flink jars, both 1.6.0 and 1.6.2 directories are identical.
>>> The job is the same, restarting from savepoint. The problem is repeatable.
>>>
>>> Has something changed in 1.6.2, and if so can it be remedied with a
>>> config change?
>>>
>>>
>>>
>>>
>>>
>>>


Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-09 Thread Till Rohrmann
Hi Mike,

the job seems to run. It might indeed only be a problem with shutting down
the ZooKeeper utils on the client side after the job has been submitted. I
will try to reproduce it locally. Keep us posted on the state of
CURATOR-466 if something should change.

Cheers,
Till

On Thu, Nov 8, 2018 at 11:17 PM Mikhail Pryakhin 
wrote:

> Hi Till.
> Of course, please find the job bootstrap and YarnJobClusterEntrypoint logs
> attached.
>
> The stacktrace below resembles the bug in Apache Curator
> https://issues.apache.org/jira/browse/CURATOR-466.
>
> java.lang.IllegalStateException: instance must be started before calling
> this method
> at
> org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:375)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.processBackgroundResult(NodeCache.java:288)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.access$300(NodeCache.java:56)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache$3.processResult(NodeCache.java:122)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:749)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:522)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl$1.processResult(ExistsBuilderImpl.java:137)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:554)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:505)
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 8 Nov 2018, at 12:12, Till Rohrmann  wrote:
>
> Hi Mike,
>
> could you also send me the YarnJobClusterEntrypoint logs. Thanks!
>
> Cheers,
> Till
>
> On Wed, Nov 7, 2018 at 9:27 PM Mikhail Pryakhin 
> wrote:
>
>> Hi Till,
>> Thank you for your reply.
>> Yes, I’ve upgraded to the latest Flink-1.6.2 and the problem is still
>> there, please find the log file attached.
>>
>>
>> Kind Regards,
>> Mike Pryakhin
>>
>> On 7 Nov 2018, at 18:46, Till Rohrmann  wrote:
>>
>> Hi Mike,
>>
>> have you tried whether the problem also occurs with Flink 1.6.2? If yes,
>> then please share with us the Flink logs with DEBUG log level to further
>> debug the problem.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 26, 2018 at 5:46 PM Mikhail Pryakhin 
>> wrote:
>>
>>> Hi community!
>>>
>>> Righ after I've upgraded flink up to flink-1.6.1 I get an exception
>>> during job deployment as a YARN cluster.
>>> The job is submitted with zookeper HA enabled, in detached mode.
>>>
>>> The flink yaml contains the following properties:
>>>
>>> high-availability: zookeeper
>>> high-availability.zookeeper.quorum: 
>>> high-availability.zookeeper.storageDir: hdfs:///
>>> high-availability.zookeeper.path.root: 
>>> high-availability.zookeeper.path.namespace: 
>>>
>>> the job is deployed via flink CLI command like the following:
>>>
>>> "${FLINK_HOME}/bin/flink" run \
>>> -m yarn-cluster \
>>> -ynm "${JOB_NAME}-${JOB_VERSION}" \
>>> -yn "${tm_containers}" \
>>> -ys "${tm_slots}" \
>>> -ytm "${tm_memory}" \
>>> -yjm "${jm_memory}" \
>>> -p "${parallelism}" \
>>> -yqu "${queue}" \
>>> -yt "${YARN_APP_PATH}" \
>>> -c "${MAIN_CLASS}" \
>>> -yst \
>>> -yd \
>>> ${class_path} \
>>> "${YARN_APP_PATH}"/"${APP_JAR}"
>>>
>>>
>>> After the job has been successfully deplyed, I've got an exception:
>>>
>>> 2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 |
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>>> | Background exception was not retry-able or retry gave up
>>> java.lang.InterruptedException
>>> at java.lang.Object.wait(Native Method)
>>> 

Re: Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-09 Thread Till Rohrmann
Hi Cliff,

this sounds not right. Could you share the logs of the Yarn cluster
entrypoint with the community for further debugging? Ideally on DEBUG
level. The Yarn logs would also be helpful to fully understand the problem.
Thanks a lot!

Cheers,
Till

On Thu, Nov 8, 2018 at 9:59 PM Cliff Resnick  wrote:

> I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
> configuration of 3 slots per TM. The cluster is dedicated to a single job
> that runs at full capacity in "FLIP6" mode. So in this cluster, the
> parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).
>
> When I run the job in 1.6.0, seven Task Managers are spun up as expected.
> But if I run with 1.6.2 only four Task Managers spin up and the job hangs
> waiting for more resources.
>
> Our Flink distribution is set up by script after building from source. So
> aside from flink jars, both 1.6.0 and 1.6.2 directories are identical. The
> job is the same, restarting from savepoint. The problem is repeatable.
>
> Has something changed in 1.6.2, and if so can it be remedied with a config
> change?
>
>
>
>
>
>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-09 Thread Till Rohrmann
Could you send us a small example program which we can use to reproduce the
problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta  wrote:

> Yeah, it IS using Kryo serializer.
>
> Jayant Ameta
>
>
> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann  wrote:
>
>> Hi Jayant, could you check that the UUID key on the TM is actually
>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>> in the constructor of the `AbstractKeyedStateBackend`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>>
>>> Hi, Jayant
>>>
>>> Your code looks good to me. And I’ve tried the serialize/deserialize
>>> of Kryo on UUID class, it all looks okay.
>>>
>>> I’m not very sure about this problem. Maybe you can write a very
>>> simple demo to try if it works.
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Monday, Oct 29, 2018 11:53
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jiayi,
>>> Any further help on this?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta 
>>> wrote:
>>>
>>>> MapStateDescriptor descriptor = new 
>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>> String.class);
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>Can you show us the descriptor in the codes below?
>>>>>
>>>>> client.getKvState(JobID.fromHexString(
>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>
>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>>>>>
>>>>>>
>>>>> Jiayi Liao, Best
>>>>>
>>>>>
>>>>>  Original Message
>>>>> *Sender:* Jayant Ameta
>>>>> *Recipient:* bupt_ljy
>>>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>>>> user@flink.apache.org>
>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>> Exception
>>>>>
>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>>>> wrote:
>>>>>
>>>>>> Hi Gordon,
>>>>>> Following is the stack trace that I'm getting:
>>>>>>
>>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> Encountered unregistered class ID: -985346241*
>>>>>> *Serialization trace:*
>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.T

Re: HA jobmanagers redirect to ip address of leader instead of hostname

2018-11-09 Thread Till Rohrmann
Thanks for opening this issue Jeroen. I think we need to look into this.
Could you maybe share the Flink logs with the community? Ideally you attach
them to the JIRA issue.

Cheers,
Till

On Thu, Nov 8, 2018 at 10:51 PM Jeroen Steggink | knowsy 
wrote:

> Hi Till,
>
> Thanks for your reply. We are running version 1.5.4. We can't upgrade to
> 1.6.x because we are using Apache Beam which doesn't support 1.6.x yet.
>
> I have also made a Jira issue about this:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-10748
> Best regards,
> Jeroen Steggink
>
> On 08-Nov-18 11:40, Jeroen Steggink | knowsy wrote:
>
> Hi Till,
>
> Thanks for your reply. We are running version 1.5.4. We can't upgrade to
> 1.6.x because we are using Apache Beam which doesn't support 1.6.x yet.
>
> I have also made a Jira issue about this:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-10748
> Best regards,
> Jeroen Steggink
>
> On 07-Nov-18 16:06, Till Rohrmann wrote:
>
> Hi Jeroen,
>
> this sounds like a bug in Flink that we return sometimes IP addresses
> instead of hostnames. Could you tell me which Flink version you are using?
> In the current version, the redirect address and the address retrieved from
> ZooKeeper should actually be the same.
>
> In the future, we plan to remove the redirect message and simply forward
> the request to the current leader. This should hopefully avoid these kind
> of problems.
>
> Cheers,
> Till
>
> On Fri, Oct 26, 2018 at 1:40 PM Jeroen Steggink | knowsy 
> wrote:
>
>> Hi,
>>
>> I'm having some troubles with Flink jobmanagers in a HA setup within
>> OpenShift.
>>
>> I have three jobmanagers, a Zookeeper cluster and a loadbalancer
>> (Openshift/Kubernetes Route) for the web ui / rest server on the
>> jobmanagers. Everything works fine, as long as the loadbalancer connects
>> to the leader. However, when the leader changes and the loadbalancer
>> connects to a non-leader, the jobmanager redirects to a leader using the
>> ip address of the host. Since the routing in our network is done using
>> hostnames, it doesn't know how to find the node using the ip address and
>> results in a timeout.
>>
>> So I have a few questions:
>> 1. Why is Flink using the ip addresses instead of the hostname which are
>> configured in the config? Other times it does use the hostname, like the
>> info send to Zookeeper.
>> 2. Is there another way of coping with connections to non-leaders
>> instead of redirects? Maybe proxying through a non-leader to the leader?
>>
>> Cheers,
>> Jeroen
>>
>>
>
>


Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-08 Thread Till Rohrmann
Hi Mike,

could you also send me the YarnJobClusterEntrypoint logs. Thanks!

Cheers,
Till

On Wed, Nov 7, 2018 at 9:27 PM Mikhail Pryakhin  wrote:

> Hi Till,
> Thank you for your reply.
> Yes, I’ve upgraded to the latest Flink-1.6.2 and the problem is still
> there, please find the log file attached.
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 7 Nov 2018, at 18:46, Till Rohrmann  wrote:
>
> Hi Mike,
>
> have you tried whether the problem also occurs with Flink 1.6.2? If yes,
> then please share with us the Flink logs with DEBUG log level to further
> debug the problem.
>
> Cheers,
> Till
>
> On Fri, Oct 26, 2018 at 5:46 PM Mikhail Pryakhin 
> wrote:
>
>> Hi community!
>>
>> Righ after I've upgraded flink up to flink-1.6.1 I get an exception
>> during job deployment as a YARN cluster.
>> The job is submitted with zookeper HA enabled, in detached mode.
>>
>> The flink yaml contains the following properties:
>>
>> high-availability: zookeeper
>> high-availability.zookeeper.quorum: 
>> high-availability.zookeeper.storageDir: hdfs:///
>> high-availability.zookeeper.path.root: 
>> high-availability.zookeeper.path.namespace: 
>>
>> the job is deployed via flink CLI command like the following:
>>
>> "${FLINK_HOME}/bin/flink" run \
>> -m yarn-cluster \
>> -ynm "${JOB_NAME}-${JOB_VERSION}" \
>> -yn "${tm_containers}" \
>> -ys "${tm_slots}" \
>> -ytm "${tm_memory}" \
>> -yjm "${jm_memory}" \
>> -p "${parallelism}" \
>> -yqu "${queue}" \
>> -yt "${YARN_APP_PATH}" \
>> -c "${MAIN_CLASS}" \
>> -yst \
>> -yd \
>> ${class_path} \
>> "${YARN_APP_PATH}"/"${APP_JAR}"
>>
>>
>> After the job has been successfully deplyed, I've got an exception:
>>
>> 2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 |
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>> | Background exception was not retry-able or retry gave up
>> java.lang.InterruptedException
>> at java.lang.Object.wait(Native Method)
>> at java.lang.Object.wait(Object.java:502)
>> at
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
>> at
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1097)
>> at
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:274)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CreateBuilderImpl$7.performBackgroundOperation(CreateBuilderImpl.java:561)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.OperationAndData.callPerformBackgroundOperation(OperationAndData.java:72)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:831)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>> at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> If the job is deployed in "attached mode" everything goes fine.
>>
>>
>>
>>
>>
>> Kind Regards,
>> Mike Pryakhin
>>
>>
>


Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Till Rohrmann
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter 
wrote:

> Hi,
>
> I think I can already spot the problem: LockableTypeSerializer.duplicate()
> is not properly implemented because it also has to call duplicate() on the
> element serialiser that is passed into the constructor of the new instance.
> I will open an issue and fix the problem.
>
> Best,
> Stefan
>
> On 7. Nov 2018, at 17:17, Till Rohrmann  wrote:
>
> Hi Shailesh,
>
> could you maybe provide us with an example program which is able to
> reproduce this problem? This would help the community to better debug the
> problem. It looks not right and might point towards a bug in Flink. Thanks
> a lot!
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz 
> wrote:
>
>> This is some problem with serializing your events using Kryo. I'm adding
>> Gordon to cc, as he was recently working with serializers. He might give
>> you more insights what is going wrong.
>>
>> Best,
>>
>> Dawid
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>> 1.6.1, the only commit on top of 1.6 is this:
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>
>> I ran two separate identical jobs (with and without checkpointing
>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
>> *only
>> when checkpointing (HDFS backend) is enabled*, with the below stack
>> trace.
>>
>> I did see a similar problem with different operators here (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>> Is this a known issue which is getting addressed?
>>
>> Any ideas on what could be causing this?
>>
>> Thanks,
>> Shailesh
>>
>>
>> 2018-10-24 17:04:13,365 INFO
>> org.apache.flink.runtime.taskmanager.Task -
>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>> function.
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>> at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>> at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>> at
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>> at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>> at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>> ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.In

Re: Error after upgrading to Flink 1.6.2

2018-11-07 Thread Till Rohrmann
Hi Flavio,

I haven't seen this problem before. Are you using Flink's HBase connector?
According to similar problems with Spark one needs to make sure that the
hbase jars are on the classpath [1, 2]. If not, then it might be a problem
with the MR1 version 2.6.0-mr1-cdh5.11.2 which caused problems for CDH 5.2
[2]. It could also be worthwhile to try it out with the latest CDH version.

[1]
https://stackoverflow.com/questions/34901331/spark-hbase-error-java-lang-illegalstateexception-unread-block-data
[2]
https://mapr.com/community/s/question/0D50L6BIthGSAT/javalangillegalstateexception-unread-block-data-when-running-spark-with-yarn
[3]
https://issues.apache.org/jira/browse/SPARK-1867?focusedCommentId=14322647=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14322647

Cheers,
Till

On Wed, Nov 7, 2018 at 12:05 PM Flavio Pompermaier 
wrote:

> I forgot to mention that I'm using Flink 1.6.2 compiled for cloudera CDH
> 5.11.2:
>
> /opt/shared/devel/apache-maven-3.3.9/bin/mvn clean install
> -Dhadoop.version=2.6.0-cdh5.11.2 -Dhbase.version=1.2.0-cdh5.11.2
> -Dhadoop.core.version=2.6.0-mr1-cdh5.11.2 -DskipTests -Pvendor-repos
>
> On Wed, Nov 7, 2018 at 11:48 AM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> we tried to upgrade our jobs to Flink 1.6.2 but now we get the following
>> error (we saw a similar issue with spark that was caused by different java
>> version on the cluster servers so we checked them and they are all to the
>> same version - oracle-8-191):
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
>> initialize task 'DataSink (Parquet write: 
>> hdfs:/rivela/1/1/0_staging/parquet)': Deserializing the OutputFormat 
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>> failed: unread block data
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>>  ... 10 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat 
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>> failed: unread block data
>>  at 
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>  ... 15 more
>> Caused by: java.lang.IllegalStateException: unread block data
>>  at 
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
>>  at 
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
>>  at 
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>  at 
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>  ... 16 more
>>
>>
>> Has anyone faced this problem before? How can we try to solve it?
>> Best,Flavio
>>
>
>


Re: flink-1.6.2 in standalone-job mode | Cluster initialization failed.

2018-11-07 Thread Till Rohrmann
Hi Zavalit,

the AbstractMethodError indicates that there must be some kind of version
conflict. From Flink 1.6.1 to 1.6.2 we modified the signature of
`ClusterEntrypoint#createResourceManager` which causes the problem if you
mix up versions. Could you check that you don't mix Flink 1.6.1 and 1.6.2
classes. Please also make sure that you don't bundle Flink runtime classes
in your job jar. If you do this, then please recompile the job with the
Flink version or remove these classes.

Cheers,
Till

On Tue, Oct 30, 2018 at 12:21 PM zavalit  wrote:

> Hi,
> just tried to launch flink app in flink-1.6.2 and get
>
> 2018-10-30 11:07:19,961 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Cluster
> initialization failed.
> java.lang.AbstractMethodError:
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createResourceManager(Lorg/apache/flink/configuration/Configuration;Lorg/apache/flink/runtime/clusterframework/types/ResourceID;Lorg/apache/flink/runtime/rpc/RpcService;Lorg/apache/flink/runtime/highavailability/HighAvailabilityServices;Lorg/apache/flink/runtime/heartbeat/HeartbeatServices;Lorg/apache/flink/runtime/metrics/MetricRegistry;Lorg/apache/flink/runtime/rpc/FatalErrorHandler;Lorg/apache/flink/runtime/entrypoint/ClusterInformation;Ljava/lang/String;)Lorg/apache/flink/runtime/resourcemanager/ResourceManager;
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:338)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:232)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:190)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:189)
> at
>
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:176)
>
> complete log is here:
> https://gist.github.com/zavalit/4dba49cdea45c6f56f947a7dcec1a666
>
> job manager is started with:
> ./bin/standalone-job.sh start-foreground --configDir conf --job-classname
> MyEntryClass
>
> the same app runs as it is in flink-1.6.1, the only thing that have changed
> is a flink version
>
> thx in advance, for any insides
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Unbalanced Kafka consumer consumption

2018-11-07 Thread Till Rohrmann
Hi Gerard,

the behaviour you are describing sounds odd to me. I have a couple of
questions:

1. Which Flink and Kafka version are you using?
2. How many partitions do you have? --> Try to set the parallelism of your
job to the number of partitions. That way, you will have one partition per
source task.
3. How are the source operators distributed? Are they running on different
nodes?
4. What do you mean with "until it (the blue one) was finished consuming
the partition"? I assume that you don't ingest into the Kafka topic live
but want to read persisted data.
5. Are you using Flink's metrics to monitor the different source tasks?
Check what the source operator's output rate is (should be visible from the
web UI).

Cheers,
Till

On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia  wrote:

> I think my problem is not the same, yours is that you want to consume from
> partitions with more data faster to avoid consuming first the one with less
> elements which could advance the event time too fast. Mine is that Kafka
> only consumes from some partitions even if it seems that it has resources
> to read and process from all of them at the same time.
>
> Gerard
>
> On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:
>
>> Hi,
>>
>>If I understand your problem correctly, there is a similar JIRA
>> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>>
>>
>> Jiayi Liao,Best
>>
>>  Original Message
>> *Sender:* Gerard Garcia
>> *Recipient:* fearsome.lucidity
>> *Cc:* user
>> *Date:* Monday, Oct 29, 2018 17:50
>> *Subject:* Re: Unbalanced Kafka consumer consumption
>>
>> The stream is partitioned by key after ingestion at the finest
>> granularity that we can (which is finer than how stream is partitioned when
>> produced to kafka). It is not perfectly balanced but still is not so
>> unbalanced to show this behavior (more balanced than what the lag images
>> show).
>>
>> Anyway, let's assume that the problem is that the stream is so unbalanced
>> that one operator subtask can't handle the ingestion rate. It is expected
>> then that all the others operators reduce its ingestion rate even if they
>> have resources to spare? The task is configured with processing time and
>> there are no windows. If that is the case, is there a way to let operator
>> subtasks process freely even if one of them is causing back pressure
>> upstream?
>>
>> The attached images shows how Kafka lag increases while the throughput is
>> stable until some operator subtasks finish.
>>
>> Thanks,
>>
>> Gerard
>>
>> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
>> wrote:
>>
>>> You can always shuffle the stream generated by the Kafka source
>>> (dataStream.shuffle()) to evenly distribute records downstream.
>>>
>>> On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:
>>>
 Hi,

 We are experience issues scaling our Flink application and we have
 observed
 that it may be because Kafka messages consumption is not balanced across
 partitions. The attached image (lag per partition) shows how only one
 partition consumes messages (the blue one in the back) and it wasn't
 until
 it finished that the other ones started to consume at a good rate
 (actually
 the total throughput multiplied by 4 when these started) . Also, when
 that
 ones started to consume, one partition just stopped an accumulated
 messages
 back again until they finished.

 We don't see any resource (CPU, network, disk..) struggling in our
 cluster
 so we are not sure what could be causing this behavior. I can only
 assume
 that somehow Flink or the Kafka consumer is artificially slowing down
 the
 other partitions. Maybe due to how back pressure is handled?

 <
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>


 Gerard





 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-07 Thread Till Rohrmann
Hi Jayant, could you check that the UUID key on the TM is actually
serialized using a Kryo serializer? You can do this by setting a breakpoint
in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:

> Hi, Jayant
>
> Your code looks good to me. And I’ve tried the serialize/deserialize
> of Kryo on UUID class, it all looks okay.
>
> I’m not very sure about this problem. Maybe you can write a very
> simple demo to try if it works.
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user >
> *Date:* Monday, Oct 29, 2018 11:53
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jiayi,
> Any further help on this?
>
> Jayant Ameta
>
>
> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta  wrote:
>
>> MapStateDescriptor descriptor = new 
>> MapStateDescriptor<>("rulePatterns", UUID.class,
>> String.class);
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>Can you show us the descriptor in the codes below?
>>>
>>> client.getKvState(JobID.fromHexString(
>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>
>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHint() {}), descriptor);


>>> Jiayi Liao, Best
>>>
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Friday, Oct 26, 2018 02:26
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Also, I haven't provided any custom serializer in my flink job.
>>> Shouldn't the same configuration work for queryable state client?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>> wrote:
>>>
 Hi Gordon,
 Following is the stack trace that I'm getting:

 *Exception in thread "main" java.util.concurrent.ExecutionException:
 java.lang.RuntimeException: Failed request 0.*
 * Caused by: java.lang.RuntimeException: Failed request 0.*
 * Caused by: java.lang.RuntimeException: Error while processing request
 with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
 unregistered class ID: -985346241*
 *Serialization trace:*
 *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
 * at
 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
 * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
 * at
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
 * at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
 * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
 * at
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
 * at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
 * at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
 * at
 org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
 * at
 org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
 * at
 org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
 * at
 org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
 * at
 org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
 * at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
 * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
 * at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
 * at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
 * at java.lang.Thread.run(Thread.java:748)*

 I am not using any custom serialize as mentioned by Jiayi.

 Jayant Ameta


 On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:

> Hi  Jayant,
>
>   There should be a Serializer parameter in the constructor of the
> StateDescriptor, you should create a new serializer like this:
>
>
>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>
>
>  By the way, can you show us your kryo exception like what Gordon said?
>
>
> Jiayi Liao, Best
>
>
>
>  Original Message
> *Sender:* Tzu-Li (Gordon) Tai
> *Recipient:* Jayant Ameta; bupt_ljy<
> bupt_...@163.com>
> *Cc:* user
> *Date:* Thursday, Oct 

Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Till Rohrmann
Hi Shailesh,

could you maybe provide us with an example program which is able to
reproduce this problem? This would help the community to better debug the
problem. It looks not right and might point towards a bug in Flink. Thanks
a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz 
wrote:

> This is some problem with serializing your events using Kryo. I'm adding
> Gordon to cc, as he was recently working with serializers. He might give
> you more insights what is going wrong.
>
> Best,
>
> Dawid
> On 25/10/2018 05:41, Shailesh Jain wrote:
>
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
> *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
> at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
> ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
> at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
> at
> 

Re: InterruptedException when async function is cancelled

2018-11-07 Thread Till Rohrmann
Hi Anil,

as Stephan stated, the fix is not included in Flink 1.4.2 but in the later
version of Flink. Can you upgrade to Flink 1.5.5 or Flink 1.6.2 to check
whether the problem still occurs?

Cheers,
Till

On Sun, Oct 28, 2018 at 8:55 AM Anil  wrote:

> I do see the same error but in case different situation. I'm not cancelling
> the job. Below is my error stack trace. SwiglobeZoneFromLatLong is my UDF
> name. Is this error something ignorable. I'm using flink 1.4.2.
> Thanks in advance.
>
> ```
> {"debug_level":"ERROR","debug_timestamp":"2018-10-28
> 06:40:20,838","debug_thread":"Source: Custom Source -> from: (event,
> proctime) -> select: (proctime, CityFromLatLong1(event.latLong.lat,
> event.latLong.lng) AS $f1, SwiglobeZoneFromLatLong(event.latLong.lat,
> event.latLong.lng) AS $f2, event.listingDataEventStats.serviceableRestCount
> AS $f3) (1/8)","debug_file":"StreamTask.java",
> "debug_line":"326","debug_message":"Could not shut down timer service",
> "job_name": "7bb4d4a4-f85a-429e-92e8-0c887f9b8cbd" }
> java.lang.InterruptedException
> at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
> at
>
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: RocksDB checkpointing dir per TM

2018-11-07 Thread Till Rohrmann
This is a very good point Elias. We actually forgot to add these options to
the configuration documentation after a refactoring. I will fix it.

Cheers,
Till

On Fri, Oct 26, 2018 at 8:27 PM Elias Levy 
wrote:

> There is also state.backend.rocksdb.localdir.  Oddly, I can find the
> documentation for it in the 1.5 docs
> ,
> but not in the 1.6 docs
> .
> The option is still in master
> ,
> and it is used
> 
> .
>
> On Fri, Oct 26, 2018 at 3:01 AM Andrey Zagrebin 
> wrote:
>
>> Hi Taher,
>>
>> TMs keep state locally while running, in this case RocksDB files already
>> belong to TM.
>> You can point it to the same NVME disk location on each node, relevant
>> Flink options here are:
>> - io.tmp.dirs
>> - taskmanager.state.local.root-dirs
>> This data is transient and has temporary nature. It does not survive a
>> job failure.
>>
>> The checkpoint is a logical snapshot of the operator state for all
>> involved TMs,
>> so it belongs to the job and usually uploaded to a distributed file
>> system available on all TMs.
>> The location is set in Flink option ‘state.checkpoints.dir'.
>> This way job can restore from it with different set of TMs.
>>
>> Best,
>> Andrey
>>
>> > On 26 Oct 2018, at 08:29, Taher Koitawala 
>> wrote:
>> >
>> > Hi All,
>> >   Our current cluster configuration uses one HDD which is
>> mainly for root and an other NVME disk per node, [1]we want make sure all
>> TMs write their own RocksDB files to the NVME disk only, how do we do that?
>> >
>> > [2] Is it also possible to specify multiple directories per TMs so that
>> we have an even spread when the RocksDB files are written?
>> >
>> > Thanks,
>> > Taher Koitawala
>>
>>


Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-07 Thread Till Rohrmann
Hi Mike,

have you tried whether the problem also occurs with Flink 1.6.2? If yes,
then please share with us the Flink logs with DEBUG log level to further
debug the problem.

Cheers,
Till

On Fri, Oct 26, 2018 at 5:46 PM Mikhail Pryakhin 
wrote:

> Hi community!
>
> Righ after I've upgraded flink up to flink-1.6.1 I get an exception during
> job deployment as a YARN cluster.
> The job is submitted with zookeper HA enabled, in detached mode.
>
> The flink yaml contains the following properties:
>
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> high-availability.zookeeper.storageDir: hdfs:///
> high-availability.zookeeper.path.root: 
> high-availability.zookeeper.path.namespace: 
>
> the job is deployed via flink CLI command like the following:
>
> "${FLINK_HOME}/bin/flink" run \
> -m yarn-cluster \
> -ynm "${JOB_NAME}-${JOB_VERSION}" \
> -yn "${tm_containers}" \
> -ys "${tm_slots}" \
> -ytm "${tm_memory}" \
> -yjm "${jm_memory}" \
> -p "${parallelism}" \
> -yqu "${queue}" \
> -yt "${YARN_APP_PATH}" \
> -c "${MAIN_CLASS}" \
> -yst \
> -yd \
> ${class_path} \
> "${YARN_APP_PATH}"/"${APP_JAR}"
>
>
> After the job has been successfully deplyed, I've got an exception:
>
> 2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 |
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> | Background exception was not retry-able or retry gave up
> java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1097)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)
> at
> org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:274)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CreateBuilderImpl$7.performBackgroundOperation(CreateBuilderImpl.java:561)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.OperationAndData.callPerformBackgroundOperation(OperationAndData.java:72)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:831)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> If the job is deployed in "attached mode" everything goes fine.
>
>
>
>
>
> Kind Regards,
> Mike Pryakhin
>
>


Re: 答复: Flink1.6.0 submit job and got "No content to map due to end-of-input" Error

2018-11-07 Thread Till Rohrmann
Hi Jeroen,

could you maybe share the Flink logs with us to further debug the problem?

Cheers,
Till

On Fri, Oct 26, 2018 at 3:56 PM Jeroen Steggink | knowsy 
wrote:

> Hi,
>
> I'm running Flink 1.5.4 and all dependencies in the job rely on 1.5.4.
> However, I still get this error. According to the JIRA issue it should be
> fixed in 1.5.4 as well.
>
> Since I'm using Apache Beam to build the jar, I can't move to version
> 1.6.x.
>
> What could it be?
>
> Cheers,
>
> Jeroen
> On 07-Sep-18 17:52, Till Rohrmann wrote:
>
> Hi Gongsen,
>
> Chesnay found and fixed the problem:
> https://issues.apache.org/jira/browse/FLINK-10293.
>
> Cheers,
> Till
>
>
> On Wed, Sep 5, 2018 at 10:00 AM 潘 功森  wrote:
>
>> Hi  Chesney,
>>
>>I can sure the client and cluster all upgraded to 1.6.0 cause if I
>> used “./flink run XXX.jar” to submit a job and it works fine.You can see ui
>> below.
>>
>>But when I used createRemoteEnvironment at local, and it
>> failed.It confused me a lot.
>>
>>
>>
>>
>> --
>> *发件人:* Chesnay Schepler 
>> *发送时间:* Wednesday, September 5, 2018 3:23:23 PM
>> *收件人:* 潘 功森; vino yang; d...@flink.apache.org
>> *抄送:* user
>> *主题:* Re: 答复: Flink1.6.0 submit job and got "No content to map due to
>> end-of-input" Error
>>
>> Did you upgrade both the client and cluster to 1.6.0? The server returned
>> a completely empty response which shouldn't be possible if it runs 1.6.0.
>>
>> On 05.09.2018 07:27, 潘 功森 wrote:
>>
>> Hi  Vino,
>>
>>
>>
>> Below are dependencies I used,please have a look.
>>
>>
>>
>> I floud it also inclued flink-connector-kafka-0.10_2.11-1.6.0.jar and
>> flink-connector-kafka-0.9_2.11-1.6.0.jar, and I don’t know if it has any
>> effect?
>>
>>
>>
>> yours,
>>
>> Gongsen
>>
>>
>>
>> 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
>>
>>
>> --
>> *发件人:* vino yang  
>> *发送时间:* Wednesday, September 5, 2018 10:35:59 AM
>> *收件人:* d...@flink.apache.org
>> *抄送:* user
>> *主题:* Re: Flink1.6.0 submit job and got "No content to map due to
>> end-of-input" Error
>>
>> Hi Pangongsen,
>>
>> Do you upgrade the Flink-related dependencies you use at the same time?
>> In other words, is the dependency consistent with the flink version?
>>
>> Thanks, vino.
>>
>> ? ??  于2018年9月4日周二 下午10:07写道:
>>
>>> Hi all,
>>>  I use below way to submit jar to Flink :
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createRemoteEnvironment(config.clusterIp,
>>>
>>> config.clusterPort,
>>>
>>> config.clusterFlinkJar);
>>>
>>>
>>> I used Flink 1.3.2 before, and it works fine. But I upgrade it to
>>> 1.6.0, and I got the error below:
>>>
>>> 2018-09-04 21:38:32.039 [ERROR] [flink-rest-client-netty-19-1]
>>> org.apache.flink.runtime.rest.RestClient - Unexpected plain-text response:
>>>
>>> 2018-09-04 21:38:32.137 [ERROR] [flink-rest-client-netty-18-1]
>>> org.apache.flink.runtime.rest.RestClient - Response was not valid JSON.
>>>
>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>>> No content to map due to end-of-input
>>>
>>>
>>> Could you give me some advice to fix it?
>>>
>>> yours,
>>> Gongsen
>>>
>>>
>>
>


Re: Flink Task Allocation on Nodes

2018-11-07 Thread Till Rohrmann
Hi Sayat,

at the moment it is not possible to control the scheduling behaviour of
Flink. In the future, we plan to add some kind of hints which controls
whether tasks of a job get spread out or will be packed on as few nodes as
possible.

Cheers,
Till

On Fri, Oct 26, 2018 at 2:06 PM Kien Truong  wrote:

> Hi,
>
> There are couple of reasons:
>
> - Easier resource allocation and isolation: one faulty job doesn't affect
> another.
>
> - Mix and match of Flink version: you can leave the old stable jobs run
> with the old Flink version, and use the latest version of Flink for new
> jobs.
>
> - Faster metrics collection: Flink generates a lots of metrics, by keeping
> each cluster small, our Prometheus instance can scrape their metrics a lot
> faster.
>
>
> Regards,
>
> Kien
>
>
> On 10/26/2018 2:50 PM, Sayat Satybaldiyev wrote:
>
> Thanks for the advice, Klein. Could you please share more details why it's
> best to allocate for each job a separate cluster?
>
> On Wed, Oct 24, 2018 at 3:23 PM Kien Truong 
> wrote:
>
>> Hi,
>>
>> You can have multiple Flink clusters on the same set of physical
>> machines. In our experience, it's best to deploy a separate Flink
>> cluster for each job and adjust the resource accordingly.
>>
>> Best regards,
>> Kien
>>
>> On Oct 24, 2018 at 20:17, >
>> wrote:
>>
>> Flink Cluster in standalone with HA configuration. It has 6 Task managers
>> and each has 8 slots. Overall, 48 slots for the cluster.
>>
>> >>If you cluster only have one task manager with one slot in each node,
>> then the job should be spread evenly.
>> Agree, this will solve the issue. However, the cluster is running other
>> jobs and in this case it won't have hardware resource for other jobs.
>>
>> On Wed, Oct 24, 2018 at 2:20 PM Kien Truong 
>> wrote:
>>
>>> Hi,
>>>
>>> How are your task managers deploy ?
>>>
>>> If you cluster only have one task manager with one slot in each node,
>>> then the job should be spread evenly.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
>>> > Is there any way to indicate flink not to allocate all parallel tasks
>>> > on one node?  We have a stateless flink job that reading from 10
>>> > partition topic and have a parallelism of 6. Flink job manager
>>> > allocates all 6 parallel operators to one machine, causing all traffic
>>> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
>>> > and ideal to spread one parallel operator to one machine. Is there a
>>> > way to do than in Flink?
>>>
>>


Re: HA jobmanagers redirect to ip address of leader instead of hostname

2018-11-07 Thread Till Rohrmann
Hi Jeroen,

this sounds like a bug in Flink that we return sometimes IP addresses
instead of hostnames. Could you tell me which Flink version you are using?
In the current version, the redirect address and the address retrieved from
ZooKeeper should actually be the same.

In the future, we plan to remove the redirect message and simply forward
the request to the current leader. This should hopefully avoid these kind
of problems.

Cheers,
Till

On Fri, Oct 26, 2018 at 1:40 PM Jeroen Steggink | knowsy 
wrote:

> Hi,
>
> I'm having some troubles with Flink jobmanagers in a HA setup within
> OpenShift.
>
> I have three jobmanagers, a Zookeeper cluster and a loadbalancer
> (Openshift/Kubernetes Route) for the web ui / rest server on the
> jobmanagers. Everything works fine, as long as the loadbalancer connects
> to the leader. However, when the leader changes and the loadbalancer
> connects to a non-leader, the jobmanager redirects to a leader using the
> ip address of the host. Since the routing in our network is done using
> hostnames, it doesn't know how to find the node using the ip address and
> results in a timeout.
>
> So I have a few questions:
> 1. Why is Flink using the ip addresses instead of the hostname which are
> configured in the config? Other times it does use the hostname, like the
> info send to Zookeeper.
> 2. Is there another way of coping with connections to non-leaders
> instead of redirects? Maybe proxying through a non-leader to the leader?
>
> Cheers,
> Jeroen
>
>


Re: RichInputFormat working differently in eclipse and in flink cluster

2018-11-07 Thread Till Rohrmann
Hi Teena,

which Flink version are you using? Have you tried whether this happens with
the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <
teena.kap...@bprise.com> wrote:

> Hi all,
>
>
>
> I have implemented RichInputFormat for reading result of aggregation
> queries in Elasticsearch. There are around 10 buckets, which are of
> type json array. Note: This is one time response.
>
>
>
> My idea here is to iterate these arrays in parallel. Here is the pseudo
> code.
>
>
>
> public void configure(Configuration parameters) {
>
> System.out.println("configure");
>
> }
>
>
>
> public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
>
> }
>
>
>
> public ResponseInputSplit[] createInputSplits(int minNumSplits){
>
> System.out.println("createInputSplits");
>
>
>
> //read from elastic
>
> // add buckets to array
>
> }
>
>
>
> public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[]
> inputSplits) {
>
> //this is default
>
> System.out.println("getInputSplitAssigner");
>
> return new DefaultInputSplitAssigner(inputSplits);
>
> }
>
>
>
> public void open(ResponseInputSplit split) {
>
> //read buckets
>
> }
>
>
>
> public boolean reachedEnd(){
>
> System.out.println("reachedEnd");
>
> }
>
>
>
> public Bounce nextRecord(Bounce reuse) {
>
> }
>
>
>
> public void close(){
>
> }
>
>
>
> // my main method,
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> DataSet bounce_data_set = env.createInput(new
> MyInputDataSetInputFormat());
>
>
>
> When running in eclipse, it executes createInputSplits and the results
> look fine. Logs are given below.
>
> Output is à
>
> configure
>
> Connected to JobManager at
> Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...
>
> configure
>
> createInputSplits
>
>
>
> When submitting job in flink cluster, it doesn’t execute ‘configure’ and
> ‘createInputSplits’ methods. Instead it directly goes to nextRecord
> function. Logs are given below.
>
> Output is à
>
> Starting execution of program
>
> configure
>
> Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for
> job completion.
>
> Connected to JobManager at Actor[akka.tcp://flink@:xxx
> /user/jobmanager#1219973491] with leader session id...
>
> 10/26/2018 15:05:57 Job execution switched to status RUNNING.
>
> 10/26/2018 15:05:57 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
>
> 10/26/2018 15:05:57 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
>
> 10/26/2018 15:06:00 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
>
> 10/26/2018 15:06:00 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
>
> java.lang.NullPointerException
>
>at com.xxx.test.
> MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)
>
>
>
> Regards,
>
> Teena
>
>
>


[ANNOUNCE] Weekly community update #45

2018-11-06 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #45. Please post any news and
updates you want to share with the community to this thread.

# First release candidate for Flink 1.7.0

The community has published the first release candidate for Flink 1.7.0
[0]. Please help the community by giving the RC some exposure and report
any problems you might encounter.

# External shuffle service

Zhijiang started a proposal to add support for an external shuffle service
to Flink [1]. Please join the discussion to learn more.

# Flink operators for Kubernetes

Anand from Lyft shared his work on a Flink Kubernetes operator with the
community. If you want to learn more visit this document [2].

# Flink SQL DDL Design

Shuyi kicked off a discussion about adding support for SQL DDL to Flink
[3]. Chime in if you want to voice your opinion.

# Enhancing flink scheduler by implementing blacklist mechanism

Yingjie proposed to add a blacklisting mechanism to Flink [4]. It will
allow to black list unreliable TaskManagers which won't be used for further
scheduling. That way, the job execution will be more reliable.

# Flip-23: Model serving

Boris published his work on the model serving [5] library on Github [6].
Check it out!

# Flink web UI based on Angular 7

Yadong shared a web UI used at Alibaba with the community [7]. It is a
rework of the existing web UI based on Angular 7.

# Flip-27: Refactor source interface

Aljoscha started discussing the design of the new source interface [8].

# Enhance Table API functionality

Xiaowei and Jincheng started discussing potential improvements to Flink's
Table API and streaming SQL [9, 10]. If you want to learn more about the
Table API and streaming SQL's future, then join this discussion.

[0]
https://lists.apache.org/thread.html/5383a813b21b67655f1982a48a5d131b213596c20343954f9ef53209@%3Cdev.flink.apache.org%3E
[1]
https://lists.apache.org/thread.html/a64497d24b839a4c84dc29ddcf8bf43a34f13984ffa79a9bd64e858c@%3Cdev.flink.apache.org%3E
[2]
https://docs.google.com/document/d/1_AITTq0fay-NwUIqNfqN0pum-erOmS_4kYZnlMxw6_o/edit?usp=gmail#heading=h.ge413lh374xj
[3]
https://lists.apache.org/thread.html/cb696438e9bf7ff2a44953c438135db3b68ff7f96cff59847df0867d@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/c0a2057f8171e75734c15a3a45ca5177fce1e04a19b9a02ba064706c@%3Cdev.flink.apache.org%3E
[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
[6] https://github.com/FlinkML/flink-modelServer
[7] https://github.com/vthinkxie/flink-runtime-web
[8]
https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E
[9]
https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E
[10]
https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E


Cheers,
Till


Re: Why dont't have a csv formatter for kafka table source

2018-11-02 Thread Till Rohrmann
Hi Jocean,

these kind of issues should go to the user mailing list. I've cross posted
it there and put dev to bcc.

Cheers,
Till

On Fri, Nov 2, 2018 at 6:43 AM Jocean shi  wrote:

> Hi all,
>  I have  encountered a error When i want to register a table from kafka
> using csv formatter.
>  The error is "Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory"
>
> Jocean
>


[ANNOUNCE] Weekly community update #44

2018-10-30 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #44. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.5.5 and 1.6.2 have been released

The Flink community is proud to announce the two bugfix release Flink 1.5.5
[1] and Flink 1.6.2 [2] have been released.

# Integrate Flink SQL with Hive ecosystem

Xuefu proposed to integrate Flink more tightly with the Hive ecosystem. The
discussion has been concluded with a resulting design document. If you want
to learn more about these efforts please check out the ML thread [3] or the
corresponding JIRA issue [4].

# News on API changes with Scala 2.12

The community is dedicated to add Scala 2.12 support with the next upcoming
major release 1.7. Aljoscha laid out the required changes from an API
perspective when upgrading to Scala 2.12 [5].

# Changing our web technology stack

Fabian kicked off a discussion about Flink's web technology stack [6]. He
noted that the stack is quite dusty and proposed to change technologies to
React or Angular 2-7. If you have an opinion on this topic or want to help
with this effort, then please join the discussion.

# Feature freeze for 1.7

The community works hard towards the Flink 1.7 feature freeze. Currently,
the community tries to finish the last set of features (Scala 2.12 support,
state evolution and fixing local recovery). It looks promising to finish
these threads by middle of this week.

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-1-5-5-released-tp24158.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-1-6-2-released-tp24159.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Integrate-Flink-SQL-well-with-Hive-ecosystem-tp23721.html
[4] https://issues.apache.org/jira/browse/FLINK-10556
[5]
https://lists.apache.org/thread.html/75d12228e4a1125f17cd31ff5e7e9b95509978613f2a88ebad55e042@%3Cdev.flink.apache.org%3E
[6]
https://lists.apache.org/thread.html/68416f3dc8c18433f427bb1f1b1a3c482aebbcdeee0f4e8a2540d147@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

2018-10-30 Thread Till Rohrmann
As Vino pointed out, you need to configure a checkpoint directory which is
accessible from all TMs. Otherwise you won't be able to recover the state
if the task gets scheduled to a different TaskManager. Usually, people use
HDFS or S3 for that.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:50 AM vino yang  wrote:

> Hi John,
>
> Is the file system configured by RocksDBStateBackend HDFS?[1]
>
> Thanks, vino.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend
>
> John Stone  于2018年10月30日周二 上午2:54写道:
>
>> I am testing Flink in a Kubernetes cluster and am finding that a job gets
>> caught in a recovery loop.  Logs show that the issue is that a checkpoint
>> cannot be found although checkpoints are being taken per the Flink web UI.
>> Any advice on how to resolve this is most appreciated.
>>
>> Note on below: I can easily replicate this with a single TaskManager (>1
>> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a
>> job parallelism of 8.
>>
>> Setup:
>> - Flink 1.6.0
>> - Kubernetes cluster.
>> - 1 JobManager node, 2 TaskManager nodes.
>> - RocksDB backend with incremental checkpointing.
>> - There is not a persistent volume mounted on any of the three nodes.  In
>> production, we would obviously need a persistent volume on the JobManager.
>> - Job submitted and running such that the job is parallelized over both
>> nodes (i.e. each TM has 4 task slots; job parallelism = 5).
>>
>> Test:
>> - Let the job collect a few checkpoints, say, 9 checkpoints.
>> - Kill one of the two TMs (kubectl delete pods ).
>> - New TM pod starts.
>>
>> Result:
>> - After the new TM starts, the job will cycle through FAILING -> RUNNING
>> -> FAILING -> RUNNING ->...
>>
>> Relevant Information
>> Contents of JobManager's pod:
>> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
>> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
>> chk-9  shared  taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
>> _metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>>
>> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls -al
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared
>>
>> Stacktrace
>> A similar message is repeated over and over in the logs:
>> - "Could not restore operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
>> - "Could not restore operator state backend for
>> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
>> - "Could not restore operator state backend for
>> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"
>>
>> 2018-10-29 18:12:48,965 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from
>> any of the 1 provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>> ... 5 more
>> Caused by: java.io.FileNotFoundException:
>> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
>> (No such file or directory)
>> at java.io.FileInputStream.open0(Native Method)
>> at 

Re: [ANNOUNCE] Apache Flink 1.6.2 released

2018-10-29 Thread Till Rohrmann
Awesome! Thanks a lot to you Chesnay for being our release manager and to
the community for making this release happen.

Cheers,
Till

On Mon, Oct 29, 2018 at 8:37 AM Chesnay Schepler  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.6.2, which is the second bugfix release for the Apache
> Flink 1.6 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2018/10/29/release-1.6.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344110
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay
>
>


Re: [ANNOUNCE] Apache Flink 1.5.5 released

2018-10-29 Thread Till Rohrmann
Great news. Thanks a lot to you Chesnay for being our release manager and
the community for making this release possible.

Cheers,
Till

On Mon, Oct 29, 2018 at 8:36 AM Chesnay Schepler  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.5.5, which is the fifth bugfix release for the Apache
> Flink 1.5 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2018/10/29/release-1.5.5.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344112
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay
>
>


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-26 Thread Till Rohrmann
Hi Mike,

thanks for reporting this issue. I think you're right that Flink leaves
some empty nodes in ZooKeeper. It seems that we don't delete the
 node with all its children in
ZooKeeperHaServices#closeAndCleanupAllData.

Could you please open a JIRA issue to in order to fix it? Thanks a lot!

Cheers,
Till

On Fri, Oct 26, 2018 at 4:31 PM Mikhail Pryakhin 
wrote:

> Hi Andrey, Thanks a lot for your reply!
>
> What was the full job life cycle?
>
>
> 1. The job is deployed as a YARN cluster with the following properties set
>
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> high-availability.zookeeper.storageDir: hdfs:///
> high-availability.zookeeper.path.root: 
> high-availability.zookeeper.path.namespace: 
>
> 2. The job is cancelled via flink cancel  command.
>
>What I've noticed:
> when the job is running the following directory structure is created in
> zookeeper
>
> ///leader/resource_manager_lock
> ///leader/rest_server_lock
> ///leader/dispatcher_lock
>
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> ///leaderlatch/resource_manager_lock
> ///leaderlatch/rest_server_lock
> ///leaderlatch/dispatcher_lock
>
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>
> ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041
>
> ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
>
> ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde
>
>
> when the job is cancelled the some ephemeral nodes disappear, but most of
> them are still there:
>
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde
> ///leaderlatch/resource_manager_lock
> ///leaderlatch/rest_server_lock
> ///leaderlatch/dispatcher_lock
>
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> ///checkpoints/
> ///checkpoint-counter/
> ///running_job_registry/
>
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
>
>
> I start the job with Flink-1.6.1
>
>
> Was there a failover of Job Master while running before the cancelation?
>
> no there was no failover, as the job is deployed as a YARN cluster,  (YARN
> Cluster High Availability guide states that no failover is required)
>
> What version of Zookeeper do you use?
>
> Zookeer-3.4.10
>
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> as far as I understood the issue concerns a JobManager failover process
> and my question is about a manual intended cancellation of a job.
>
> Here is the method [1] responsible for cleaning zookeeper folders up [1]
> which is called when the job manager has stopped [2].
> And it seems it only cleans up the folder *running_job_registry,* other
> folders stay untouched. I supposed that everything under the 
> *///
> *folder is cleaned up when the job is cancelled.
>
>
> [1]
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
> [2]
> https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 26 Oct 2018, at 12:39, Andrey Zagrebin 
> wrote:
>
> Hi Mike,
>
> What was the full job life cycle?
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
> Was there a failover of Job Master while running before the cancelation?
> What version of Zookeeper do you use?
>
> Flink creates child nodes to create a lock for the job in Zookeeper.
> Lock is removed by removing child node (ephemeral).
> Persistent node can be a problem because if job dies and does not remove
> it,
> persistent node will not timeout and disappear as ephemeral one
> and the next job instance will not delete it because it is supposed to be
> locked by the previous.
>
> There was a recent fix in 1.6.1 where the job data was not properly
> deleted from Zookeeper [1].
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-10011
>
> On 25 Oct 2018, at 15:30, Mikhail Pryakhin  wrote:
>
> Hi Flink experts!
>
> When a streaming job with Zookeeper-HA enabled gets cancelled all the
> job-related Zookeeper nodes are not removed. Is there a reason behind that?
> I noticed that Zookeeper paths are created of type "Container Node" (an
> Ephemeral node that can have nested nodes) and fall back to Persistent node
> type in case Zookeeper doesn't support this sort of nodes.
> But anyway, it is worth removing the job Zookeeper node when a job is
> cancelled, isn't it?
>
> Thank you in advance!
>
> Kind Regards,
> Mike Pryakhin
>
>
>
>


Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-25 Thread Till Rohrmann
Hi Henry,

since version 1.5 you don't need to specify the number of TaskManagers to
start, because the system will figure this out. Moreover, in version 1.5.x
and 1.6.x it is recommended to set the number of slots per TaskManager to 1
since we did not support multi task slot TaskManagers properly. The problem
was that we start for every incoming slot request a separate TaskManager
even though there might still be some free slots left. This has been fixed
by FLINK-9455 [1]. The fix will be released with the upcoming next major
Flink release 1.7.

[1] https://issues.apache.org/jira/browse/FLINK-9455

Cheers,
Till

On Thu, Oct 25, 2018 at 5:58 AM vino yang  wrote:

> Hi Henry,
>
> The phenomenon you expressed is there, this is a bug, but I can't remember
> its JIRA number.
>
> Thanks, vino.
>
> 徐涛  于2018年10月24日周三 下午11:27写道:
>
>> Hi experts
>> I am running flink job on YARN in job cluster mode, the job is divided
>> into 2 tasks, the following are some configs of the job:
>> parallelism.default => 16
>> taskmanager.numberOfTaskSlots => 8
>> -yn => 2
>>
>> when the program starts, I found that the count of task managers is not
>> set immediately, but first expand then converge, I record the number during
>> the process:
>> Task Managers Task Slots Available Task Slots
>> 1. 14  10488
>> 2. 15 120104
>> 3. 16 128112
>> 4. 6   48  32
>> 5. 3   24  8
>> 6. 2   16  0
>>
>> The final state is correct. There are 2 tasks, 32 subtask in total, due
>> to slot sharing, only 16 slots are enough, the number of task slots per TM
>> are 8, so 2 TMs are needed.
>> I have the following question:
>> *Because I specify yn=2, why does not directly allocate 2 TMs, but goes
>> the expand then converge process?  Why does it apply 16 task managers at
>> most? If it is not a must, how to avoid it?*
>>
>> Thanks a lot!
>>
>> Best
>> Henry
>>
>


[ANNOUNCE] Flink Forward San Francisco 2019 - Call for Presentations is now open

2018-10-24 Thread Till Rohrmann
Hi everybody,

the Call for Presentations for Flink Forward San Francisco 2019 is now
open! Apply by November 30 to share your compelling Flink use case, best
practices, and latest developments with the community on April 1-2 in San
Francisco, CA.

Submit your proposal:
https://flink-forward.org/call-for-presentations-submit-talk

Cheers,
Till


[ANNOUNCE] Weekly community update #43

2018-10-23 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #43. Please post any news and
updates you want to share with the community to this thread.

# Release vote for Flink 1.5.5 and 1.6.2

The community is currently voting on the first release candidates for Flink
1.5.5 [1] and Flink 1.6.2 [2]. It would be tremendously helpful if you
could try the RC out to report any problems.

# Discussion how to improve broadcast serialization

Zhijiang and Piotr are discussing how to improve Flink's broadcast logic by
reducing the serialization overhead. Join the discussion here [3].

# Enable slot resource profile logic for resource management

Tony started a discussion to make the resource profile logic finally work
[4]. The ResourceProfiles are intended to specify the resource requirements
per operator. This would allow the Flink scheduler to better match slots
which have certain resources assigned with the requirements of operators.

# Feature freeze Flink 1.7

The Flink community is finalizing the last set of features for the upcoming
next major Flink release 1.7. Expect that this work will be finished by the
end of this week so that we can cut the release branch and freeze features
for this release.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-5-5-release-candidate-1-tp24717.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-6-2-release-candidate-1-tp24718.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-broadcast-serialization-tp23295.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-10640-Enable-Slot-Resource-Profile-for-Resource-Management-tp24786.html

Cheers,
Till


[ANNOUNCE] Weekly community update #42

2018-10-17 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #42. Please post any news and
updates you want to share with the community to this thread.

# Discussion about Flink SQL integration with Hive

Xuefu started a discussion about how to integrate Flink SQL with the Hive
ecosystem [1]. If that's of your interest, then please join the discussion.

# Flink intro slide set

Fabian started to prepare a Flink intro slide set for the community [2]. If
you want to help with the preparation of the slide set, reach out to him.

# Discussion how to share state between tasks

Thomas started a discussion about how to share state/information between
multiple tasks of an operator [3]. That way it would be possible to control
the ingestion rate of sources depending how advanced they are wrt event
time, for example.

# Releasing Flink 1.5.5 and 1.6.2

Chesnay kicked off a discussion about releasing the next bug fix releases
for Flink 1.5 and 1.6 [4]. He is currently creating the RCs which will be
published on the ML for testing soon. Please help the community with
testing them.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrate-Flink-SQL-well-with-Hive-ecosystem-td24538.html#a24568
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Creating-a-slide-set-for-a-Flink-intro-talk-td24605.html#a24643
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-5-5-and-1-6-2-td24626.html

Cheers,
Till


Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread Till Rohrmann
Hi,

1) you currently cannot merge multiple jobs into one after they have been
submitted. What you can do though, is to combine multiple jobs in your
Flink program before you submit it.

2) you can pass program arguments when you submit your job. After it
has been submitted, it is no longer possible to change the command line
arguments.

Cheers,
Till

On Mon, Oct 15, 2018 at 9:11 AM wangziyu <2375900...@qq.com> wrote:

> Dear Friend:
>   Now ,I am a learn flink for 20 days.I would to trouble
> friends
> to help solve two problems.
> Questions are as follows:
>   1. If I have some jobs,How can I merge the some jobs to One
> that convenient for me to manage?
> I have look for some restful api in
> "
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> "。I
> see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job
> contained in a jar previously uploaded via '/jars/upload'."I think it is
> not
> my purpose.
>   2.When I run a job,I need pass in several  parameters.For
> example "./flink run -d -c streaming.Kafka010NumCountConsumer
> /ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have
> know
> JobId,Can I get the job pass in several  parameters by java.I think it is
> has some interface can use,But I can't get it.
>  That is all.Can you help me that give me some
> information.Thanks so mach.
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

2018-10-15 Thread Till Rohrmann
This means that the Dispatcher has not set its leader session id which it
gets once gaining the leadership. This can also happen if the Dispatcher
just lost its leadership after you've sent the message. This problem should
resolve itself once the new leadership information has been propagated.

Cheers,
Till

On Fri, Oct 12, 2018 at 9:04 PM Samir Tusharbhai Chauhan <
samir.tusharbhai.chau...@prudential.com.sg> wrote:

> Hi Till,
>
>
>
> Can you tell when do I receive below error message?
>
>
>
> 2018-10-13 03:02:01,337 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler  -
> Could not retrieve the redirect address.
>
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message
> LocalFencedMessage(8b79d4540b45b3e622748b813d3a464b,
> LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://
> flink@127.0.0.1:50010/user/dispatcher because the fencing token is null.
>
>
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Sunday, October 07, 2018 1:24 AM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user 
> *Subject:* Re:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>
>
>
> Hi Samir,
>
>
>
> 1. In your setup (not running on top of Yarn or Mesos) you need to set the
> jobmanager.rpc.address such that the JM process knows where to bind to. The
> other components use ZooKeeper to find out the addresses. The other
> properties should not be needed.
>
> 3. You can take a look at the ZooKeeper leader latch node. Alternatively,
> you can take a look at the address to which you are redirected when
> accessing the web UI.
>
> 4.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html
> <https://clicktime.symantec.com/a/1/czwzBd85jiO-rQDJ8_mMefpd3vfTIKXunaLv5x3byJ0=?d=mPS97wozxPvF4EdXANMCpE7ERTYx_MNjBfkSDx4tF1lQcEIakYIujzTvjOPtkLS1aSEPhxKGiUWI5HVgpZqIYInAwLRaKCbZg1D_lAGXbUC3Vi43DtbJ09d0ZH04UZ5-kyFlDoSrrED_73X8GSB3rUGey5afFf4TFAgJPIKz9GoAPlZInMKklM9WuqsWblPZjTTCgsp85nImxyw2Cbn7NnrluemBf_uC-L7L_bv38RLohs-_Hk62T6F9Q_n9Pr9-c0XQB8X6XkEjJRFcchqBkuNED5pePHFYu2_RSIElSTDUKPXkeWMX5p_oDIJr3MYl9H6trEgz-b8-K2E7KtIGreBUgLYfBUKJ-lPGjQf8Eq2GjKw2wIG1JPIj5eppc3PbeYrllgNPN24wh8GWClBwTTgvXvXNEV4imz2GvD43P2AKWGynAmIzfjnK190%3D=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fops%2Fsecurity-ssl.html>
>
>
>
> Cheers,
>
> Till
>
>
>
> On Sat, Oct 6, 2018 at 5:57 PM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi Till,
>
>
>
> Thanks for identifying the issue. My cluster is up and running now.
>
>
>
> I have few queries. Can you have to anwer that?
>
>
>
>1. Do I need to set below properties in my cluster?
>
> jobmanager.rpc.address
>
> rest.address
>
> rest.bind-address
>
> jobmanager.web.address
>
>1. Is there anything I should be take care while setting it up?
>2. How do I know which job manager is active?
>3. How do I secure my cluster?
>
>
>
> Samir Chauhan
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Friday, October 05, 2018 11:09 PM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user 
> *Subject:* Re:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>
>
>
> Hi Samir,
>
>
>
> could you share the logs of the two JMs and the log where you saw the
> FencingTokenException with us?
>
>
>
> It looks to me as if the TM had an outdated fencing token (an outdated
> leader session id) with which it contacted the ResourceManager. This can
> happen and the TM should try to reconnect to the RM once it learns about
> the new leader session id via ZooKeeper. You could, for example check in
> ZooKeeper that it contains the valid leader information.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 5, 2018 at 9:58 AM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi,
>
>
>
> I am having issue in setting up cluster for Flink. I have 2 nodes for Job
> Manager and 2 nodes for Task Manager.
>
>
>
> My configuration file looks like this.
>
>
>
> jobmanager.rpc.port: 6123
>
> jobmanager.heap.size: 2048m
>
> taskmanager.heap.size: 2048m
>
> taskmanager.numberOfTaskSlots: 64
>
> parallelism.default: 1
>
> rest.port: 8081
>
> high-availability.jobmanager.port: 50010
>
> high-availability: zookeeper
>
> high-availability.storageDir: file:///sharedflink/state_dir/ha/
>
>

Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-15 Thread Till Rohrmann
Hi Abdul,

in Flink 1.4 we use Akka's death watch to detect no longer reachable hosts.
The downside of the death watch mechanism is that hosts which were detected
to be dead are being quarantined. Once in this state you need to restart
the ActorSystem in order to receive messages again. The idea behind this is
to not let the system go into an inconsistent state. You can mitigate this
problem by setting the death watch settings to higher values.

Cheers,
Till

On Fri, Oct 12, 2018 at 11:27 PM Abdul Qadeer  wrote:

> We were able to fix it by passing IP address instead of hostname for actor
> system listen address when starting taskmanager:
>
> def runTaskManager(
> taskManagerHostname: String,
> resourceID: ResourceID,
> actorSystemPort: Int,
> configuration: Configuration,
> highAvailabilityServices: HighAvailabilityServices)
> : Unit = {
>
>
> The following log message at jobmanager gave some clue:
>
> {"timeMillis":1539297842333,"thread":"jobmanager-future-thread-2","level":"DEBUG","loggerName":"org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher","message":"Could
>  not retrieve 
> QueryServiceGateway.","thrown":{"commonElementCount":0,"localizedMessage":"akka.actor.ActorNotFound:
>  Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070/), 
> Path(/user/MetricQueryService_5261ccab66b86b53a4edd64f26c1f282)]"...
>
> ...
>
>
> We figured there is some problem with hostname resolution after the actor is 
> quarantined, would you know why this happens? Is it some cache problem in 
> Flink or Akka code JobManager is using?
>
>
> On Fri, Oct 12, 2018 at 1:05 AM Till Rohrmann 
> wrote:
>
>> It is hard to tell without all logs but it could easily be a K8s setup
>> problem. Also problematic is that you are running a Flink version which is
>> no longer actively supported. Try at least to use the latest bug fix
>> release for 1.4.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 12, 2018, 09:43 Abdul Qadeer  wrote:
>>
>>> Hi Till,
>>>
>>> A few more data points:
>>>
>>> In a rerun of the same versions with fresh deployment, I see *log*
>>> .debug(*s"RegisterTaskManager: $*msg*"*) in JobManager, however the
>>> *AcknowledgeRegistration/AlreadyRegistered *messages are never sent, I
>>> have taken tcpdump for the taskmanager which doesn't recover and compared
>>> it with another taskmanager which recovers after restart (i.e. receives
>>> *AcknowledgeRegistration *message).
>>>
>>> Restarting the docker container of bad taskmanager doesn't work. The
>>> only workaround right now is to delete the kubernetes pod holding the bad
>>> taskmanager container. Does it have to do something with the akka address
>>> the jobmanager stores for a taskmanager? The only variable I see between
>>> restarting container vs pod is the change in the akka address.
>>>
>>> Also, the infinite retries for registration start after the taskmanager
>>> container restarts with Jobmanager actor system quarantined:
>>>
>>> {"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The
>>> actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has
>>> quarantined the remote actor system akka.tcp://flink@192.168.83.52:6123.
>>> Shutting the actor system down to be able to reestablish a
>>> connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}
>>>
>>>
>>> A manual restart by docker restart or killing the JVM doesn't reproduce
>>> this problem.
>>>
>>> On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer 
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I didn't try with newer versions as it is not possible to update the
>>>> Flink version atm.
>>>> If you could give any pointers for debugging that would be great.
>>>>
>>>> On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Abdul,
>>>>>
>>>>> have you tried whether this problem also occurs with 

  1   2   3   4   5   6   7   8   9   >