Re: Flink upgrade to Flink-1.12

2021-01-25 Thread Ufuk Celebi
Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:
> Hi all,
> 
> As flink doc says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions
> 
>> We do not support migration for state in RocksDB that was checkpointed using 
>> `semi-asynchronous` mode. In case your old job was using this mode, you can 
>> still change your job to use `fully-asynchronous` mode before taking the 
>> savepoint that is used as the basis for the migration.
> 
> So, my first question:
> Is "semi-asynchronous" means "incremental checkpoint"?
> 
> And second question:
> If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
> checkpoint as state backend. 
> I should: 
> 1. take a savepoint for old version(flink-1.11), 
> 2. and change job to use "full asynchronous checkpoint" ,
> 3. restart old version(flink-1.11) job with new config (full asynchronous 
> checkpoint),
> 4. then, take a savepoint
> 5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12
> 
> Whether I understand correctly?
> 
> Best regards


Re: Flink Jobmanager HA deployment on k8s

2021-01-21 Thread Ufuk Celebi
@Yang: I think this would be valuable to document. I think it's a natural 
question to ask whether you can have standby JMs with Kubernetes. What do you 
think? If you agree, we could create a JIRA ticket and work on the "official" 
docs for this.

On Thu, Jan 21, 2021, at 5:05 AM, Yang Wang wrote:
> Hi Amit Bhatia
> 
> > What is the correct way to start three jobmanager replicas with zk? Is 
> > there any link which explains this deployment scenario and configuration?
> Please find more information in the last mail. Unfortunately, we do not have 
> some documentation to guide the users how to achieve that.
> 
> > How we'll identify that out of three replicas, which Job Manager replica is 
> > the leader?
> Just like what I have said, using a K8s service for the jobmanager rpc 
> address is not a good practice.
> TaskManager/Client could not know which replica is the leader. Instead, we 
> should bind the rpc address
> to pod ip. After then, TaskManager/Client could find the leader address(pod 
> ip) via ZooKeeper.
> 
> Could you please update your yaml files and deploy again? I think you will 
> have different results then.
> 
> Best,
> Yang
> 
> Yang Wang  于2021年1月21日周四 上午11:59写道:
>> Hi Chirag Dewan,
>> 
>> Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple 
>> JobManagers will contend for
>> a leader and then write its rpc address to the ZooKeeper nodes. You could 
>> find more information how the
>> HA service works here[1]. It is about the KubernetesHAService, but the 
>> ZooKeeperHAService has the same
>> mechanism.
>> 
>> In such a case, I strongly suggest not using the service as the JobManager 
>> rpc address. Otherwise, we
>> will have the issue you have mentioned. There are 3 replicas behind the same 
>> service endpoint and only
>> one of them is the leader. TaskManager/Client do not know how to contact the 
>> leader.
>> 
>> Instead, I suggest not creating the internal service and bind the pod ip to 
>> the JobManager rpc address.
>> After then, TaskManager/Client will retrieve the leader address(pod ip + 
>> port) and contact via such an address.
>> 
>> Please find more information and the example here[1].
>> 
>> [1]. 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> [2]. 
>> https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715
>> 
>> Best,
>> Yang
>> 
>> 
>> Amit Bhatia  于2021年1月20日周三 下午12:27写道:
>>>   Hi Yang,
>>> 
>>> I tried the deployment of flink with three replicas of Jobmanger to test a 
>>> faster job recovery scenario.  Below is my deployment :
>>> 
>>>  $ kubectl get po -namit | grep zk
>>> eric-data-coordinator-zk-01/1 
>>> Running0  6d21h
>>> eric-data-coordinator-zk-11/1 
>>> Running0  6d21h
>>> eric-data-coordinator-zk-21/1 
>>> Running0  6d21h
>>> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb  1/1 
>>> Running0  19h
>>> flink-jobmanager-ha-zk-1-5d58dc469-klg5p  1/1 
>>> Running0  19h
>>> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk  1/1 
>>> Running0  19h
>>> 
>>> 
>>>  $ kubectl get svc -namit | grep zk
>>> flink-jobmanager-ha-rest-zk1NodePort   10.100.118.186   
>>> 8081:32115/TCP 21h
>>> flink-jobmanager-ha-zk1 ClusterIP  10.111.135.174   
>>> 6123/TCP,6124/TCP,8081/TCP 21h
>>> eric-data-coordinator-zkClusterIP  10.105.139.167   
>>> 2181/TCP,8080/TCP,21007/TCP7d20h
>>> eric-data-coordinator-zk-ensemble-service   ClusterIP  None 
>>> 2888/TCP,3888/TCP  7d20h
>>> 
>>> Flink Configmap:
>>> 
>>> apiVersion: v1
>>> kind: ConfigMap
>>> metadata:
>>>   name: flink-config-ha-zk-1
>>>   namespace: amit
>>>   labels:
>>> app: flink
>>> data:
>>>   flink-conf.yaml: |+
>>> jobmanager.rpc.address: flink-jobmanager-ha-zk1
>>> taskmanager.numberOfTaskSlots: 2
>>> blob.server.port: 6124
>>> jobmanager.rpc.port: 6123
>>> taskmanager.rpc.port: 6122
>>> queryable-state.proxy.ports: 6125
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.memory.process.size: 1728m
>>> parallelism.default: 2
>>> # High Availability parameters
>>> high-availability: zookeeper
>>> high-availability.cluster-id: /haclusterzk1
>>> high-availability.storageDir: file:///opt/flink/recovery/
>>> high-availability.zookeeper.path.root: /flinkhazk
>>> high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
>>> 

Re: Savepoint Location from Flink REST API

2020-04-02 Thread Ufuk Celebi
Sorry for the copy & paste error in my earlier message.  I agree with
Robert.

On 2. Apr 2020, at 11:06, Robert Metzger  wrote:

Good catch!. Yes, you can add this to FLINK-16696.

On Wed, Apr 1, 2020 at 10:59 PM Aaron Langford 
wrote:

> All, it looks like the actual return structure from the API is:
>
> 1. Success
>
>> {
>>   "status": {
>> "id": "completed"
>>   },
>>   *"operation"*: {
>> "location": "string"
>>   }
>> }
>
>
> 2. Failure
>
>> {
>>   "status": {
>> "id": "completed"
>>   },
>>   *"operation"*: {
>> "failure-cause": {
>>   "class": "string",
>>   "stack-trace": "string",
>>   "serialized-throwable": "string"
>> }
>>   }
>> }
>
>
> The difference is that "operation" is the key in the response, not
> "savepoint". Should this go into the FLINK-16696 ticket or a separate one?
>
> Aaron
>
>
> On Fri, Mar 20, 2020 at 1:33 PM Ufuk Celebi  wrote:
>
>> Hey Aaron,
>>
>> you can expect one of the two responses for COMPLETED savepoints [1, 2].
>>
>> 1. Success
>>
>> {
>>   "status": {
>> "id": "completed"
>>   },
>>   "savepoint": {
>> "location": "string"
>>   }
>> }
>>
>> 2. Failure
>>
>> {
>>   "status": {
>> "id": "completed"
>>   },
>>   "savepoint": {
>> "failure-cause": {
>>   "class": "string",
>>   "stack-trace": "string",
>>   "serialized-throwable": "string"
>> }
>>   }
>> }
>>
>> – Ufuk
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L209-L217
>> [2]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
>>
>> On Fri, Mar 20, 2020 at 7:22 PM Aaron Langford <
>> aaron.langfor...@gmail.com> wrote:
>> >
>> > Roman,
>> >
>> > Thanks for the info. That's super helpful. I'd be interested in picking
>> that ticket up.
>> >
>> > One additional question: the states that can return from this API are
>> only described as 'COMPLETED' or 'IN_PROGRESS'. How are failures
>> represented for this endpoint?
>> >
>> > Aaron
>> >
>> > On Fri, Mar 20, 2020 at 2:29 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>> >>
>> >> Hey Aaron,
>> >>
>> >> You can use /jobs/:jobid/savepoints/:triggerid to get the location
>> when the checkpoint is completed.
>> >>
>> >> Please see
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/index.html?org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html
>> >>
>> >> Meanwhile, I've created an issue to update the docs:
>> https://issues.apache.org/jira/browse/FLINK-16696
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >>
>> >> On Fri, Mar 20, 2020 at 5:09 AM Aaron Langford <
>> aaron.langfor...@gmail.com> wrote:
>> >>>
>> >>> Hey Flink Community,
>> >>>
>> >>> I'm combing through docs right now, and I don't see that a savepoint
>> location is returned or surfaced anywhere. When I do this in the CLI, I get
>> a nice message that tells me where in S3 it put my savepoint (unique
>> savepoint ID included). I'm looking for that same result to be available
>> via the REST API. Does this exist today?
>> >>>
>> >>> Aaron
>>
>


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Ufuk Celebi
PS: See also
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes

On Tue, Mar 24, 2020 at 2:49 PM Ufuk Celebi  wrote:

> Hey Niels,
>
> you can check out the README with example configuration files here:
> https://github.com/apache/flink/tree/master/flink-container/kubernetes
>
> Is that what you were looking for?
>
> Best,
>
> Ufuk
>
> On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:
>
>> Hi,
>>
>> As clearly documented here
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>>  the
>> current way of deploying Flink natively on Kubernetes is by running the 
>> ./bin/kubernetes-session.sh
>> script that runs some Java code that does "magic" to deploy in on the
>> cluster.
>> This works.
>>
>> I was wondering: Is it with the current code base already possible to
>> craft a set of Yaml files (perhaps even with a special Docker image) so
>> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
>> apply -f foo.yaml ?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Ufuk Celebi
Hey Niels,

you can check out the README with example configuration files here:
https://github.com/apache/flink/tree/master/flink-container/kubernetes

Is that what you were looking for?

Best,

Ufuk

On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:

> Hi,
>
> As clearly documented here
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>  the
> current way of deploying Flink natively on Kubernetes is by running the 
> ./bin/kubernetes-session.sh
> script that runs some Java code that does "magic" to deploy in on the
> cluster.
> This works.
>
> I was wondering: Is it with the current code base already possible to
> craft a set of Yaml files (perhaps even with a special Docker image) so
> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
> apply -f foo.yaml ?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Savepoint Location from Flink REST API

2020-03-20 Thread Ufuk Celebi
Hey Aaron,

you can expect one of the two responses for COMPLETED savepoints [1, 2].

1. Success

{
  "status": {
"id": "completed"
  },
  "savepoint": {
"location": "string"
  }
}

2. Failure

{
  "status": {
"id": "completed"
  },
  "savepoint": {
"failure-cause": {
  "class": "string",
  "stack-trace": "string",
  "serialized-throwable": "string"
}
  }
}

– Ufuk

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L209-L217
[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java

On Fri, Mar 20, 2020 at 7:22 PM Aaron Langford 
wrote:
>
> Roman,
>
> Thanks for the info. That's super helpful. I'd be interested in picking
that ticket up.
>
> One additional question: the states that can return from this API are
only described as 'COMPLETED' or 'IN_PROGRESS'. How are failures
represented for this endpoint?
>
> Aaron
>
> On Fri, Mar 20, 2020 at 2:29 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:
>>
>> Hey Aaron,
>>
>> You can use /jobs/:jobid/savepoints/:triggerid to get the location when
the checkpoint is completed.
>>
>> Please see
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/index.html?org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html
>>
>> Meanwhile, I've created an issue to update the docs:
https://issues.apache.org/jira/browse/FLINK-16696
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Mar 20, 2020 at 5:09 AM Aaron Langford <
aaron.langfor...@gmail.com> wrote:
>>>
>>> Hey Flink Community,
>>>
>>> I'm combing through docs right now, and I don't see that a savepoint
location is returned or surfaced anywhere. When I do this in the CLI, I get
a nice message that tells me where in S3 it put my savepoint (unique
savepoint ID included). I'm looking for that same result to be available
via the REST API. Does this exist today?
>>>
>>> Aaron


Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-22 Thread Ufuk Celebi
Hey Stephan,

+1.

Reading over the linked ticket and your description here, I think it makes
a lot of sense to go ahead with this. Since it's possible to upgrade via
intermediate Flink releases as a fail-safe I don't have any concerns.

– Ufuk


On Fri, Feb 21, 2020 at 4:34 PM Till Rohrmann  wrote:
>
> +1 for dropping savepoint compatibility with Flink 1.2.
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen  wrote:
>>
>> Thank you for the feedback.
>>
>> Here is the JIRA issue with some more explanation also about the
background and implications:
>> https://jira.apache.org/jira/browse/FLINK-16192
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Feb 20, 2020 at 2:26 PM vino yang  wrote:
>>>
>>> +1 for dropping Savepoint compatibility with Flink 1.2
>>>
>>> Flink 1.2 is quite far away from the latest 1.10. Especially after the
release of Flink 1.9 and 1.10, the code and architecture have undergone
major changes.
>>>
>>> Currently, I am updating state migration tests for Flink 1.10. I can
still see some binary snapshot files of version 1.2. If we agree on this
topic, we may be able to alleviate some of the burdens(remove those binary
files) when the migration tests would be updated later.
>>>
>>> Best,
>>> Vino
>>>
>>> Theo Diefenthal  于2020年2月20日周四
下午9:04写道:

 +1 for dropping compatibility.

 I personally think that it is very important for a project to keep a
good pace in developing that old legacy stuff must be dropped from time to
time. As long as there is an upgrade routine (via going to another flink
release) that's fine.

 
 Von: "Stephan Ewen" 
 An: "dev" , "user" 
 Gesendet: Donnerstag, 20. Februar 2020 11:11:43
 Betreff: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

 Hi all!
 For some cleanup and simplifications, it would be helpful to drop
Savepoint compatibility with Flink version 1.2. That version was released
almost three years ago.

 I would expect that no one uses that old version any more in a way
that they actively want to upgrade directly to 1.11.

 Even if, there is still the way to first upgrade to another version
(like 1.9) and then upgrade to 1.11 from there.

 Any concerns to drop that support?

 Best,
 Stephan


 --
 SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
 Theo Diefenthal

 T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
 theo.diefent...@scoop-software.de - www.scoop-software.de
 Sitz der Gesellschaft: Köln, Handelsregister: Köln,
 Handelsregisternummer: HRB 36625
 Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
 Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel

>


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-19 Thread Ufuk Celebi
I'm late to the party... Welcome and congrats! :-)

– Ufuk


On Mon, Aug 19, 2019 at 9:26 AM Andrey Zagrebin 
wrote:

> Hi Everybody!
>
> Thanks a lot for the warn welcome!
> I am really happy about joining Flink committer team and hope to help the
> project to grow more.
>
> Cheers,
> Andrey
>
> On Fri, Aug 16, 2019 at 11:10 AM Terry Wang  wrote:
>
> > Congratulations Andrey!
> >
> > Best,
> > Terry Wang
> >
> >
> >
> > 在 2019年8月15日,下午9:27,Hequn Cheng  写道:
> >
> > Congratulations Andrey!
> >
> > On Thu, Aug 15, 2019 at 3:30 PM Fabian Hueske  wrote:
> >
> >> Congrats Andrey!
> >>
> >> Am Do., 15. Aug. 2019 um 07:58 Uhr schrieb Gary Yao  >:
> >>
> >> > Congratulations Andrey, well deserved!
> >> >
> >> > Best,
> >> > Gary
> >> >
> >> > On Thu, Aug 15, 2019 at 7:50 AM Bowen Li  wrote:
> >> >
> >> > > Congratulations Andrey!
> >> > >
> >> > > On Wed, Aug 14, 2019 at 10:18 PM Rong Rong 
> >> wrote:
> >> > >
> >> > >> Congratulations Andrey!
> >> > >>
> >> > >> On Wed, Aug 14, 2019 at 10:14 PM chaojianok 
> >> wrote:
> >> > >>
> >> > >> > Congratulations Andrey!
> >> > >> > At 2019-08-14 21:26:37, "Till Rohrmann" 
> >> wrote:
> >> > >> > >Hi everyone,
> >> > >> > >
> >> > >> > >I'm very happy to announce that Andrey Zagrebin accepted the
> >> offer of
> >> > >> the
> >> > >> > >Flink PMC to become a committer of the Flink project.
> >> > >> > >
> >> > >> > >Andrey has been an active community member for more than 15
> >> months.
> >> > He
> >> > >> has
> >> > >> > >helped shaping numerous features such as State TTL, FRocksDB
> >> release,
> >> > >> > >Shuffle service abstraction, FLIP-1, result partition management
> >> and
> >> > >> > >various fixes/improvements. He's also frequently helping out on
> >> the
> >> > >> > >user@f.a.o mailing lists.
> >> > >> > >
> >> > >> > >Congratulations Andrey!
> >> > >> > >
> >> > >> > >Best, Till
> >> > >> > >(on behalf of the Flink PMC)
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
> >
>


Re: Flink 1.8.1: Seeing {"errors":["Not found."]} when trying to access the Jobmanagers web interface

2019-08-06 Thread Ufuk Celebi
Hey Tobias,

out of curiosity: were you using the job/application cluster (as documented
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/docker.html#flink-job-cluster
)?

– Ufuk


On Tue, Aug 6, 2019 at 1:50 PM Kaymak, Tobias 
wrote:

> I was using Apache Beam and in the lib folder I had a JAR that was using
> Flink 1.7 in its POM. After bumping that to 1.8 it works :)
>
> On Tue, Aug 6, 2019 at 11:58 AM Kaymak, Tobias 
> wrote:
>
>> It completely works when using the docker image tag 1.7.2 - I just bumped
>> back and the web interface was there.
>>
>> On Tue, Aug 6, 2019 at 10:21 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> after upgrading the docker image from version 1.7.2  to 1.8.1 and wiping
>>> out zookeeper completely I see
>>>
>>> {"errors":["Not found."]}
>>>
>>> when trying to access the webinterface of Flink. I can launch jobs from
>>> the cmdline and I can't spot any error in the logs (so far on level INFO).
>>> I tried adding the flink-runtime-web_2.12-1.8.1.jar as a dependency
>>> into the lib folder when building the Docker container, but this did not
>>> help either.
>>>
>>> Has anyone experienced this problem? Is my Flink config faulty or what
>>> could be the reason?
>>>
>>


Re: No yarn option in self-built flink version

2019-06-12 Thread Ufuk Celebi
@Arnaud: Turns out those examples are on purpose. As Chesnay pointed out in
the ticket, there are also cases where you don't necessarily want to bundle
the Hadoop dependency, but still want to set a version like that.


On Wed, Jun 12, 2019 at 9:32 AM Ufuk Celebi  wrote:

> I created https://issues.apache.org/jira/browse/FLINK-12813 for
> this. @Arnaud: Would you be interested in opening a PR with a fix?
>
> – Ufuk
>
>
> On Tue, Jun 11, 2019 at 11:10 AM LINZ, Arnaud 
> wrote:
>
>> Hello,
>>
>>
>>
>> Thanks a lot, it works. However, may I suggest that you update the
>> documentation page :
>>
>>
>>
>> mvn clean install -DskipTests -Pvendor-repos 
>> -Dhadoop.version*=*2.6.0-cdh5.16.1
>>
>> has absolutely no interest if you don’t include hadoop, that’s why I
>> thought that -Pvendor-repos was including the -Pinclude-hadoop stated in
>> the above paragraph…
>>
>>
>>
>> Arnaud
>>
>>
>>
>> *De :* Ufuk Celebi 
>> *Envoyé :* vendredi 7 juin 2019 12:00
>> *À :* LINZ, Arnaud 
>> *Cc :* user ; ches...@apache.org
>> *Objet :* Re: No yarn option in self-built flink version
>>
>>
>>
>> Hey Arnaud,
>>
>>
>>
>> I think you need to active the Hadoop profile via -Pinclude-hadoop (the
>> default was changed to not include Hadoop as far as I know).
>>
>>
>>
>> For more details, check out:
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/flinkDev/building.html#packaging-hadoop-into-the-flink-distribution
>>
>>
>>
>>
>>
>> If this does not work for you, I would wait for Chesnay's input (cc'd).
>>
>>
>>
>> – Ufuk
>>
>>
>>
>>
>>
>> On Fri, Jun 7, 2019 at 11:04 AM LINZ, Arnaud 
>> wrote:
>>
>> Hello,
>>
>> I am trying to build my own flink distribution with proper Cloudera
>> dependencies.
>> Reading
>> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html
>> I've done :
>> git clone https://github.com/apache/flink
>> cd flink
>> git checkout tags/release-1.8.0
>> $MAVEN_HOME/bin/mvn clean install -DskipTests -Pvendor-repos
>> -Dhadoop.version=2.6.0-cdh5.16.1 -Dfast
>> cd flink-dist
>> $MAVEN_HOME/bin/mvn install -DskipTests -Pvendor-repos
>> -Dhadoop.version=2.6.0-cdh5.16.1
>>
>> Everything was successful.
>>
>> However when running using flink-dist/target/flink-1.8.0-bin/flink-1.8.0
>> Running /bin/flink -h prints no yarn/Hadoop options.
>> And running
>> bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m
>> ./examples/batch/WordCount.jar
>> Prints :
>> Could not build the program from JAR file.
>>
>> Am I missing something?
>>
>> Best regards,
>> Arnaud
>>
>>
>> 
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>


Re: No yarn option in self-built flink version

2019-06-12 Thread Ufuk Celebi
I created https://issues.apache.org/jira/browse/FLINK-12813 for
this. @Arnaud: Would you be interested in opening a PR with a fix?

– Ufuk


On Tue, Jun 11, 2019 at 11:10 AM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> Thanks a lot, it works. However, may I suggest that you update the
> documentation page :
>
>
>
> mvn clean install -DskipTests -Pvendor-repos 
> -Dhadoop.version*=*2.6.0-cdh5.16.1
>
> has absolutely no interest if you don’t include hadoop, that’s why I
> thought that -Pvendor-repos was including the -Pinclude-hadoop stated in
> the above paragraph…
>
>
>
> Arnaud
>
>
>
> *De :* Ufuk Celebi 
> *Envoyé :* vendredi 7 juin 2019 12:00
> *À :* LINZ, Arnaud 
> *Cc :* user ; ches...@apache.org
> *Objet :* Re: No yarn option in self-built flink version
>
>
>
> Hey Arnaud,
>
>
>
> I think you need to active the Hadoop profile via -Pinclude-hadoop (the
> default was changed to not include Hadoop as far as I know).
>
>
>
> For more details, check out:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/flinkDev/building.html#packaging-hadoop-into-the-flink-distribution
>
>
>
>
>
> If this does not work for you, I would wait for Chesnay's input (cc'd).
>
>
>
> – Ufuk
>
>
>
>
>
> On Fri, Jun 7, 2019 at 11:04 AM LINZ, Arnaud 
> wrote:
>
> Hello,
>
> I am trying to build my own flink distribution with proper Cloudera
> dependencies.
> Reading
> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html
> I've done :
> git clone https://github.com/apache/flink
> cd flink
> git checkout tags/release-1.8.0
> $MAVEN_HOME/bin/mvn clean install -DskipTests -Pvendor-repos
> -Dhadoop.version=2.6.0-cdh5.16.1 -Dfast
> cd flink-dist
> $MAVEN_HOME/bin/mvn install -DskipTests -Pvendor-repos
> -Dhadoop.version=2.6.0-cdh5.16.1
>
> Everything was successful.
>
> However when running using flink-dist/target/flink-1.8.0-bin/flink-1.8.0
> Running /bin/flink -h prints no yarn/Hadoop options.
> And running
> bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m
> ./examples/batch/WordCount.jar
> Prints :
> Could not build the program from JAR file.
>
> Am I missing something?
>
> Best regards,
> Arnaud
>
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>


Re: No yarn option in self-built flink version

2019-06-07 Thread Ufuk Celebi
Hey Arnaud,

I think you need to active the Hadoop profile via -Pinclude-hadoop (the
default was changed to not include Hadoop as far as I know).

For more details, check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/flinkDev/building.html#packaging-hadoop-into-the-flink-distribution


If this does not work for you, I would wait for Chesnay's input (cc'd).

– Ufuk


On Fri, Jun 7, 2019 at 11:04 AM LINZ, Arnaud 
wrote:

> Hello,
>
> I am trying to build my own flink distribution with proper Cloudera
> dependencies.
> Reading
> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html
> I've done :
> git clone https://github.com/apache/flink
> cd flink
> git checkout tags/release-1.8.0
> $MAVEN_HOME/bin/mvn clean install -DskipTests -Pvendor-repos
> -Dhadoop.version=2.6.0-cdh5.16.1 -Dfast
> cd flink-dist
> $MAVEN_HOME/bin/mvn install -DskipTests -Pvendor-repos
> -Dhadoop.version=2.6.0-cdh5.16.1
>
> Everything was successful.
>
> However when running using flink-dist/target/flink-1.8.0-bin/flink-1.8.0
> Running /bin/flink -h prints no yarn/Hadoop options.
> And running
> bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m
> ./examples/batch/WordCount.jar
> Prints :
> Could not build the program from JAR file.
>
> Am I missing something?
>
> Best regards,
> Arnaud
>
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-29 Thread Ufuk Celebi
Actually, I couldn't even find a mention of this flag in the docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html

– Ufuk


On Mon, Apr 29, 2019 at 8:45 AM Ufuk Celebi  wrote:

> I didn't find this as part of the
> https://flink.apache.org/news/2019/04/09/release-1.8.0.html notes.
>
> I think an update to the Important Changes section would be valuable for
> users upgrading to 1.8 from earlier releases. Also, logging that the
> library is on the classpath but the feature flag is set to false would be a
> helpful.
>
> – Ufuk
>
>
> On Thu, Apr 25, 2019 at 4:13 PM Vishal Santoshi 
> wrote:
>
>> Ditto that, queryable-state.enable to true works.
>>
>> Thanks everyone.
>>
>> On Thu, Apr 25, 2019 at 6:28 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> As Guowei mentioned you have to enable the Queryable state. The default
>>> setting was changed in 1.8.0. There is an open JIRA[1] for changing the
>>> documentation accordingly.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-12274
>>> On 25/04/2019 03:27, Guowei Ma wrote:
>>>
>>> You could try to set queryable-state.enable to true. And check again.
>>>
>>> Vishal Santoshi 于2019年4月25日 周四上午1:40写道:
>>>
>>>> Any one ?
>>>>
>>>> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Hello folks,
>>>>>
>>>>>  Following
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>>>  .
>>>>> for setting up the Queryable Server and proxy, I have my classpath ( the
>>>>> lib directory ) that has the  required jar, But I do not see the mentioned
>>>>> log and of course am not able to set up the QS server/Proxy . This has
>>>>> worked on 1.7.2 and I think I have everything as advised, see the logs
>>>>> below. I do not  see this log  "Started the Queryable State Proxy
>>>>> Server @ ...".  Any one with this issue...
>>>>>
>>>>>
>>>>>
>>>>> 2019-04-24 15:54:26,296 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   - -Dtaskmanager.numberOfTaskSlots=1
>>>>>
>>>>> 2019-04-24 15:54:26,296 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   - --configDir
>>>>>
>>>>> 2019-04-24 15:54:26,296 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   - /usr/local/flink/conf
>>>>>
>>>>> 2019-04-24 15:54:26,296 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   -  Classpath:
>>>>> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
>>>>> */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
>>>>> :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>>>>>
>>>>> 2019-04-24 15:54:26,296 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   -
>>>>> 
>>>>>
>>>>> 2019-04-24 15:54:26,298 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   - Registered UNIX signal handlers for [TERM, HUP, INT]
>>>>>
>>>>> 2019-04-24 15:54:26,300 INFO  
>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>>>   - Maximum number of open file descriptors is 65536.
>>>>>
>>>>> 2019-04-24 15:54:26,305 INFO
>>>>> org.apache.flink.configuration.GlobalConfiguration-
>>>>> Loading configuration property: state.backend.fs.checkpointdir,
>>>>> hdfs:///flink-checkpoints_k8s_test/prod
>>>>>
>>>>> 2
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>> Best,
>>> Guowei
>>>
>>>


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-29 Thread Ufuk Celebi
I didn't find this as part of the
https://flink.apache.org/news/2019/04/09/release-1.8.0.html notes.

I think an update to the Important Changes section would be valuable for
users upgrading to 1.8 from earlier releases. Also, logging that the
library is on the classpath but the feature flag is set to false would be a
helpful.

– Ufuk


On Thu, Apr 25, 2019 at 4:13 PM Vishal Santoshi 
wrote:

> Ditto that, queryable-state.enable to true works.
>
> Thanks everyone.
>
> On Thu, Apr 25, 2019 at 6:28 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Vishal,
>>
>> As Guowei mentioned you have to enable the Queryable state. The default
>> setting was changed in 1.8.0. There is an open JIRA[1] for changing the
>> documentation accordingly.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12274
>> On 25/04/2019 03:27, Guowei Ma wrote:
>>
>> You could try to set queryable-state.enable to true. And check again.
>>
>> Vishal Santoshi 于2019年4月25日 周四上午1:40写道:
>>
>>> Any one ?
>>>
>>> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hello folks,

  Following
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
  .
 for setting up the Queryable Server and proxy, I have my classpath ( the
 lib directory ) that has the  required jar, But I do not see the mentioned
 log and of course am not able to set up the QS server/Proxy . This has
 worked on 1.7.2 and I think I have everything as advised, see the logs
 below. I do not  see this log  "Started the Queryable State Proxy
 Server @ ...".  Any one with this issue...



 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - -Dtaskmanager.numberOfTaskSlots=1

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - --configDir

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - /usr/local/flink/conf

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   -  Classpath:
 /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
 */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
 :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   -
 

 2019-04-24 15:54:26,298 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - Registered UNIX signal handlers for [TERM, HUP, INT]

 2019-04-24 15:54:26,300 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - Maximum number of open file descriptors is 65536.

 2019-04-24 15:54:26,305 INFO
 org.apache.flink.configuration.GlobalConfiguration-
 Loading configuration property: state.backend.fs.checkpointdir,
 hdfs:///flink-checkpoints_k8s_test/prod

 2




 --
>> Best,
>> Guowei
>>
>>


Re: Flink: How to use zookeeper when deployed in k8s?

2019-04-01 Thread Ufuk Celebi
You can set

jobmanager.rpc.address: jmServiceName
high-availability.jobmanager.port: 6123

in flink-conf.yaml and expose the port in the JobManager service.

– Ufuk

On Mon, Apr 1, 2019 at 9:29 AM sora  wrote:
>
> Hello,
> I encountered a problem when deploying flink to k8s.
> When high-availability: zookeeper in flink-conf.yaml is set, a random port 
> will be used instead of the config port.
> But this port is not exposed in k8s, and all taskmanager can not connect to 
> jobmanager.
> How can I solve this?
> One solution is to use the HostNetwork, but this is not exactly what I want.
>
> Looking forward to your kind reply.
> Thanks.
>


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Ufuk Celebi
Thanks Gordon. We already have 5 people watching it. :-)

On Thu, Mar 28, 2019 at 10:23 AM Tzu-Li (Gordon) Tai
 wrote:
>
> @Ufuk
>
> Yes, creating a JIRA now already to track this makes sense.
>
> I've proceeded to open one:  https://issues.apache.org/jira/browse/FLINK-12047
> Let's move any further discussions there.
>
> Cheers,
> Gordon
>
> On Thu, Mar 28, 2019 at 5:01 PM Ufuk Celebi  wrote:
>>
>> I think such a tool would be really valuable to users.
>>
>> @Gordon: What do you think about creating an umbrella ticket for this
>> and linking it in this thread? That way, it's easier to follow this
>> effort. You could also link Bravo and Seth's tool in the ticket as
>> starting points.
>>
>> – Ufuk


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Ufuk Celebi
I think such a tool would be really valuable to users.

@Gordon: What do you think about creating an umbrella ticket for this
and linking it in this thread? That way, it's easier to follow this
effort. You could also link Bravo and Seth's tool in the ticket as
starting points.

– Ufuk


Re: [DISCUSS] Create a Flink ecosystem website

2019-03-06 Thread Ufuk Celebi
I like Shaoxuan's idea to keep this a static site first. We could then
iterate on this and make it a dynamic thing. Of course, if we have the
resources in the community to quickly start with a dynamic site, I'm
not apposed.

– Ufuk

On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger  wrote:
>
> Awesome! Thanks a lot for looking into this Becket! The VMs hosted by Infra
> look suitable.
>
> @Shaoxuan: There is actually already a static page. It used to be linked,
> but has been removed from the navigation bar for some reason. This is the
> page: https://flink.apache.org/ecosystem.html
> We could update the page and add it back to the navigation bar for the
> coming weeks. What do you think?
>
> I would actually like to push for a dynamic page right away.
>
> I know it's kind of a bold move, but how do you feel about sending the
> owners of spark-packages.org a short note, if they are interested in
> sharing the source? We could maintain the code together in a public repo.
> If they are not interested in sharing, or we decide not to ask in the first
> place, I'm happy to write down a short description of the requirements,
> maybe some mockups. We could then see if we find somebody here in the
> community who's willing to implement it.
> Given the number of people who are eager to contribute, I believe we will
> be able to find somebody pretty soon.
>
>
> On Wed, Mar 6, 2019 at 3:49 AM Becket Qin  wrote:
>
> > Forgot to provide the link...
> >
> > [1] https://www.apache.org/dev/services.html#blogs (Apache infra services)
> > [2] https://www.apache.org/dev/freebsd-jails (FreeBSD Jail provided by
> > Apache Infra)
> >
> > On Wed, Mar 6, 2019 at 10:46 AM Becket Qin  wrote:
> >
> >> Hi Robert,
> >>
> >> Thanks for the feedback. These are good points. We should absolutely
> >> shoot for a dynamic website to support more interactions in the community.
> >> There might be a few things to solve:
> >> 1. The website code itself. An open source solution would be great. TBH,
> >> I do not have much experience on building a website. It'll be great if
> >> someone could help comment on the solution here.
> >> 2. The hardware to host the website. Apache Infra provides a few
> >> services[1] that Apache projects can leverage. I did not see database
> >> service, but maybe we can run a simple MySQL db in FreeBSD jail[2].
> >>
> >> @Bowen & vino, thanks for the positive feedback!
> >>
> >> @Shaoxuan Wang 
> >> Thanks for the suggestion. That sounds reasonable to me. We probably need
> >> a page in the Flink official site anyways, even just provide links it to
> >> the ecosystem website. So listing the connectors in that static page seems
> >> something we could start with while we are working on the dynamic pages.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Wed, Mar 6, 2019 at 10:40 AM Shaoxuan Wang 
> >> wrote:
> >>
> >>> Hi Becket and Robert,
> >>>
> >>> I like this idea!  Let us roll this out with Flink connectors at the
> >>> first beginning. We can start with a static page, and upgrade it when we
> >>> find a better solution for dynamic one with rich functions.
> >>>
> >>> Regards,
> >>> Shaoxuan
> >>>
> >>>
> >>> On Wed, Mar 6, 2019 at 1:36 AM Robert Metzger 
> >>> wrote:
> >>>
>  Hey Becket,
> 
>  This is a great idea!
>  For this to be successful, we need to make sure the page is placed
>  prominently so that the people submitting something will get attention 
>  for
>  their contributions.
>  I think a dynamic site would probably be better, if we want features
>  such as up and downvoting or comments.
>  I would also like this to be hosted on Apache infra, and endorsed by
>  the community.
> 
>  Does anybody here know any existing software that we could use?
>  The only think I was able to find is AUR: https://aur.archlinux.org/
>  (which is a community packages site for Arch Linux. The source code of 
>  this
>  portal is open source, but the layout and structure is not an ideal fit 
>  for
>  our requirements)
> 
>  Best,
>  Robert
> 
> 
> 
>  On Tue, Mar 5, 2019 at 12:03 PM Becket Qin 
>  wrote:
> 
> > Hi folks,
> >
> > I would like to start a discussion thread about creating a Flink
> > ecosystem website. The website aims to help contributors who have 
> > developed
> > projects around Flink share their work with the community.
> >
> > Please see the following doc for more details.
> >
> > https://docs.google.com/document/d/12oCItoLbKrLGuwEUFcCfigezIR2hW3925j1hh3kGp4A/edit#
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> 


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Ufuk Celebi
Hey Aaron,

I'm glad to hear that you resolved the issue.

I think a docs contribution for this would be very helpful and could
update this page:
https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md.

If you want to create a separate JIRA ticket for this, ping me with
your JIRA username and I'll add you to the list of contributors (which
gives you permissions to create tickets).

I'll think a bit more about the other points you mentioned and get
back to you if I have another idea.

Best,

Ufuk

On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin  wrote:
>
> Hi Ufuk,
>
> I'll answer your question, but first I'll give you an update on how we 
> resolved the issue:
>
> * adding `org.apache.hadoop.io.compress.SnappyCodec` to 
> `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, 
> putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
> * putting a jar with `hadoop-common` + it's transitive dependencies, then 
> using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its 
> transitive dependencies). So we end up with jar that has `SnappyCodec` and 
> whatever it needs to call transitively. We put this jar on the task manager 
> classpath.
>
> I believe `SnappyCodec` was being called via our code. This worked the first 
> time but deploying a second time caused `libhadoop.so` to be loaded in a 
> second class loader. By putting a jar with `SnappyCodec` and it's transitive 
> dependencies on the task manager classpath and specifying that `SnappyCodec` 
> needs to be loaded from the parent classloader, we ensure that only one 
> classloader loads `libhadoop.so`. I don't think this is the best way to 
> achieve what we want, but it works for now.
>
> Next steps: if no one is on it, I can take a stab at updating the 
> documentation to clarify how to debug and resolve Native library loading. 
> This was a nice learning experience and I think it'll be helpful to have this 
> in the docs for those who aren't well-versed in how classloading on the JVM 
> works!
>
> To answer your questions:
>
> 1. We install hadoop on our machines and tell flink task managers to access 
> it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in 
> `flink-conf.yaml`
> 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job 
> manager classpath (I believe this is only used by the Job Managers when they 
> interact with S3 for checkpoints etc. I don't believe any user code is using 
> this).
> 3. Our flink applications consist of a "fat jar" that has some 
> `org.apache.hadoop` dependencies bundled with it. I believe this is the 
> source of why we're loading `SnappyCodec` twice and triggering this issue.
> 4. For example code: we have a small wrapper around 
> `org.apache.flink.api.common.io.FileInputFormat` which does the work with 
> sequence files. It looks like (after removing some stuff to make it more 
> clear):
>
> ```
> abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
> typeInformation: TypeInformation[T]
> ) extends FileInputFormat[T]
> with ResultTypeQueryable[T] {
>   @transient private var bufferedNextRecord: T = _
>   @transient private var hadoopStream: HadoopFSDataInputStream = _
>   @transient private var sequenceFileReader: SequenceFile.Reader = _
>
>   unsplittable = true
>   enumerateNestedFiles = true
>
>   // *
>   // This is where we'd see exceptions.
>   // *
>   override def open(fileSplit: FileInputSplit): Unit = {
> super.open(fileSplit)
> val config = new Configuration()
> hadoopStream = WrappedHadoopInputStream.wrap(stream)
> sequenceFileReader = new SequenceFile.Reader(config, 
> SequenceFile.Reader.stream(hadoopStream))
> bufferNextRecord()
>   }
> ...
> }
>
> // AND
>
> class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
> extends InputStream
> with Seekable
> with PositionedReadable {
>
>   def read(): Int = underlying.read()
>   def seek(pos: Long): Unit = underlying.seek(pos)
>   def getPos: Long = underlying.getPos
> }
> ...
> ```
>
> Thanks for all your help, I appreciate it! I wouldn't have been able to debug 
> and resolve this if it wasn't for you filing the ticket. Thank you so much!
>
> [0] https://github.com/pantsbuild/jarjar
>
> Aaron Levin
>
> On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:
>>
>> Hey Aaron,
>>
>> sorry for the late reply (again).
>>
>> (1) I think that your final result is in line with what I have
>> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-28 Thread Ufuk Celebi
> Any ideas?
>>>
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:
>>>>
>>>> Hi Ufuk,
>>>>
>>>> One more update: I tried copying all the hadoop native `.so` files (mainly 
>>>> `libhadoop.so`) into `/lib` and am I still experiencing the issue I 
>>>> reported. I also tried naively adding the `.so` files to the jar with the 
>>>> flink application and am still experiencing the issue I reported (however, 
>>>> I'm going to investigate this further as I might not have done it 
>>>> correctly).
>>>>
>>>> Best,
>>>>
>>>> Aaron Levin
>>>>
>>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> Two updates:
>>>>>
>>>>> 1. As suggested in the ticket, I naively copied the every `.so` in 
>>>>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My 
>>>>> knowledge of how shared libs get picked up is hazy, so I'm not sure if 
>>>>> blindly copying them like that should work. I did check what 
>>>>> `System.getProperty("java.library.path")` returns at the call-site and 
>>>>> it's: 
>>>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>>>> 2. The exception I see comes from 
>>>>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). 
>>>>> This uses `System.loadLibrary("hadoop")`.
>>>>>
>>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: 
>>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>>>>> [2019-01-23 19:52:33.081376]  at 
>>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>>>>> [2019-01-23 19:52:33.081406]  at 
>>>>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>>>>> [2019-01-23 19:52:33.081429]  at 
>>>>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>>>>> [2019-01-23 19:52:33.081457]  at 
>>>>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>>>>> [2019-01-23 19:52:33.081494]  at 
>>>>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>>>>> [2019-01-23 19:52:33.081517]  at 
>>>>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>>>>> [2019-01-23 19:52:33.081549]  at 
>>>>> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081728]  at 
>>>>> scala.collection.immutable.List.foreach(List.scala:392)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081832]  at 
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>>>>> [2019-01-23 19:52:33.081854]  at 
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>> [2019-01-23 19:52:33.081882]  at 
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>>>>> [2019-01-23 19:52:33.081904]  at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> [2019-01-23 19:52:33.081946]  at 
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:
>>>>>>
>>>>>> Hey Ufuk,
>>>>>>
>>>>>> So, I looked into this a little bit:
>>>>>>
>>>>>> 1. clarification: my issues are with the hadoop-related snappy libraries 
>>>>>> and not libsnappy itself (this is my bad for not being clearer, sorry!). 
>>>>>> I already have `libsnappy` on my classpath, but I am looking into 
>>>>>> including the hadoop snappy libraries.
>>>>>> 2. exception: I don't see the class loading error. I'm going to try to 
>>>>>> put some more instrumentation and see if I can get a clearer stacktrace 
>>>>>> (right now I get an NPE on

Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Ufuk Celebi
On Wed, Jan 23, 2019 at 11:01 AM Timo Walther  wrote:
> I think what is more important than a big dist bundle is a helpful
> "Downloads" page where users can easily find available filesystems,
> connectors, metric repoters. Not everyone checks Maven central for
> available JAR files. I just saw that we added a "Optional components"
> section recently [1], we just need to make it more prominent. This is
> also done for the SQL connectors and formats [2].

+1 I fully agree with the importance of the Downloads page. We
definitely need to make any optional dependencies that users need to
download easy to find.


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Ufuk Celebi
I like the idea of a leaner binary distribution. At the same time I
agree with Jamie that the current binary is quite convenient and
connection speeds should not be that big of a deal. Since the binary
distribution is one of the first entry points for users, I'd like to
keep it as user-friendly as possible.

What do you think about building a lean distribution by default and a
"full" distribution that still bundles all the optional dependencies
for releases? (If you don't think that's feasible I'm still +1 to only
go with the "lean dist" approach.)

– Ufuk

On Wed, Jan 23, 2019 at 9:36 AM Stephan Ewen  wrote:
>
> There are some points where a leaner approach could help.
> There are many libraries and connectors that are currently being adding to
> Flink, which makes the "include all" approach not completely feasible in
> long run:
>
>   - Connectors: For a proper experience with the Shell/CLI (for example for
> SQL) we need a lot of fat connector jars.
> These come often for multiple versions, which alone accounts for 100s
> of MBs of connector jars.
>   - The pre-bundled FileSystems are also on the verge of adding 100s of MBs
> themselves.
>   - The metric reporters are bit by bit growing as well.
>
> The following could be a compromise:
>
> The flink-dist would include
>   - the core flink libraries (core, apis, runtime, etc.)
>   - yarn / mesos  etc. adapters
>   - examples (the examples should be a small set of self-contained programs
> without additional dependencies)
>   - default logging
>   - default metric reporter (jmx)
>   - shells (scala, sql)
>
> The flink-dist would NOT include the following libs (and these would be
> offered for individual download)
>   - Hadoop libs
>   - the pre-shaded file systems
>   - the pre-packaged SQL connectors
>   - additional metric reporters
>
>
> On Tue, Jan 22, 2019 at 3:19 AM Jeff Zhang  wrote:
>
> > Thanks Chesnay for raising this discussion thread.  I think there are 3
> > major use scenarios for flink binary distribution.
> >
> > 1. Use it to set up standalone cluster
> > 2. Use it to experience features of flink, such as via scala-shell,
> > sql-client
> > 3. Downstream project use it to integrate with their system
> >
> > I did a size estimation of flink dist folder, lib folder take around 100M
> > and opt folder take around 200M. Overall I agree to make a thin flink dist.
> > So the next problem is which components to drop. I check the opt folder,
> > and I think the filesystem components and metrics components could be moved
> > out. Because they are pluggable components and is only used in scenario 1 I
> > think (setting up standalone cluster). Other components like flink-table,
> > flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
> > still use it to try the features of flink. For me, scala-shell is the first
> > option to try new features of flink.
> >
> >
> >
> > Fabian Hueske  于2019年1月18日周五 下午7:34写道:
> >
> >> Hi Chesnay,
> >>
> >> Thank you for the proposal.
> >> I think this is a good idea.
> >> We follow a similar approach already for Hadoop dependencies and
> >> connectors (although in application space).
> >>
> >> +1
> >>
> >> Fabian
> >>
> >> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> >> ches...@apache.org>:
> >>
> >>> Hello,
> >>>
> >>> the binary distribution that we release by now contains quite a lot of
> >>> optional components, including various filesystems, metric reporters and
> >>> libraries. Most users will only use a fraction of these, and as such
> >>> pretty much only increase the size of flink-dist.
> >>>
> >>> With Flink growing more and more in scope I don't believe it to be
> >>> feasible to ship everything we have with every distribution, and instead
> >>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
> >>> lean and additional components are downloaded separately and added by
> >>> the user.
> >>>
> >>> This would primarily affect the /opt directory, but could also be
> >>> extended to cover flink-dist. For example, the yarn and mesos code could
> >>> be spliced out into separate jars that could be added to lib manually.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>>
> >>> Chesnay
> >>>
> >>>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-21 Thread Ufuk Celebi
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx1024m
You are looking for this line > 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -


Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin  wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is 
> canceled or fails and then is restarted (with or without 
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` 
> seem to start having impact again and our job will run without failure. More 
> below.
>
> We use consume Snappy-compressed sequence files in our flink job. This 
> requires access to the hadoop native libraries. In our `flink-conf.yaml` for 
> both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates 
> fine. If at some point I cancel the job or if the job restarts for some other 
> reason, the job will begin to crashloop because it tries to open a 
> Snappy-compressed file but doesn't have access to the codec from the native 
> hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the 
> task manager while the job is crashlooping, the job is start running without 
> any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail 
> is if the `env.java.opts` were not being passed through to the job on restart 
> for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? 
> I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded 
> jars to access our files in S3. We do not use the `bundled-with-hadoop` 
> distribution of Flink.
>
> Best,
>
> Aaron Levin


Re: [DISCUSS] Dropping flink-storm?

2019-01-10 Thread Ufuk Celebi
+1 to drop.

I totally agree with your reasoning. I like that we tried to keep it,
but I don't think the maintenance overhead would be justified.

– Ufuk

On Wed, Jan 9, 2019 at 4:09 PM Till Rohrmann  wrote:
>
> With https://issues.apache.org/jira/browse/FLINK-10571, we will remove the
> Storm topologies from Flink and keep the wrappers for the moment.
>
> However, looking at the FlinkTopologyContext [1], it becomes quite obvious
> that Flink's compatibility with Storm is really limited. Almost all of the
> context methods are not supported which makes me wonder how useful these
> wrappers really are. Given the additional maintenance overhead of having
> them in the code base and no indication that someone is actively using
> them, I would still be in favour of removing them. This will reduce our
> maintenance burden in the future. What do you think?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
>
> Cheers,
> Till
>
> On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:
>
> > Yes, let's do it this way.
> > The wrapper classes are probably not too complex and can be easily tested.
> > We have the same for the Hadoop interfaces, although I think only the
> > Input- and OutputFormatWrappers are actually used.
> >
> >
> > Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> > ches...@apache.org>:
> >
> >> That sounds very good to me.
> >>
> >> On 08.10.2018 11:36, Till Rohrmann wrote:
> >> > Good point. The initial idea of this thread was to remove the storm
> >> > compatibility layer completely.
> >> >
> >> > During the discussion I realized that it might be useful for our users
> >> > to not completely remove it in one go. Instead for those who still
> >> > want to use some Bolt and Spout code in Flink, it could be nice to
> >> > keep the wrappers. At least, we could remove flink-storm in a more
> >> > graceful way by first removing the Topology and client parts and then
> >> > the wrappers. What do you think?
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler  >> > > wrote:
> >> >
> >> > I don't believe that to be the consensus. For starters it is
> >> > contradictory; we can't /drop /flink-storm yet still /keep //some
> >> > parts/.
> >> >
> >> > From my understanding we drop flink-storm completely, and put a
> >> > note in the docs that the bolt/spout wrappers of previous versions
> >> > will continue to work.
> >> >
> >> > On 08.10.2018 11:04, Till Rohrmann wrote:
> >> >> Thanks for opening the issue Chesnay. I think the overall
> >> >> consensus is to drop flink-storm and only keep the Bolt and Spout
> >> >> wrappers. Thanks for your feedback!
> >> >>
> >> >> Cheers,
> >> >> Till
> >> >>
> >> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
> >> >> mailto:ches...@apache.org>> wrote:
> >> >>
> >> >> I've created
> >> >> https://issues.apache.org/jira/browse/FLINK-10509 for
> >> >> removing flink-storm.
> >> >>
> >> >> On 28.09.2018 15:22, Till Rohrmann wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I would like to discuss how to proceed with Flink's storm
> >> >> compatibility
> >> >> > layer flink-strom.
> >> >> >
> >> >> > While working on removing Flink's legacy mode, I noticed
> >> >> that some parts of
> >> >> > flink-storm rely on the legacy Flink client. In fact, at
> >> >> the moment
> >> >> > flink-storm does not work together with Flink's new
> >> distributed
> >> >> > architecture.
> >> >> >
> >> >> > I'm also wondering how many people are actually using
> >> >> Flink's Storm
> >> >> > compatibility layer and whether it would be worth porting it.
> >> >> >
> >> >> > I see two options how to proceed:
> >> >> >
> >> >> > 1) Commit to maintain flink-storm and port it to Flink's
> >> >> new architecture
> >> >> > 2) Drop flink-storm
> >> >> >
> >> >> > I doubt that we can contribute it to Apache Bahir [1],
> >> >> because once we
> >> >> > remove the legacy mode, this module will no longer work
> >> >> with all newer
> >> >> > Flink versions.
> >> >> >
> >> >> > Therefore, I would like to hear your opinion on this and in
> >> >> particular if
> >> >> > you are using or planning to use flink-storm in the future.
> >> >> >
> >> >> > [1] https://github.com/apache/bahir-flink
> >> >> >
> >> >> > Cheers,
> >> >> > Till
> >> >> >
> >> >>
> >> >
> >>
> >>


Re: Per job cluster doesn't shut down after the job is canceled

2018-11-14 Thread Ufuk Celebi
Hey Paul,

It might be related to this: https://github.com/apache/flink/pull/7004 (see 
linked issue for details).

Best,

Ufuk

> On Nov 14, 2018, at 09:46, Paul Lam  wrote:
> 
> Hi Gary,
> 
> Thanks for your reply and sorry for the delay. The attachment is the 
> jobmanager logs after invoking the cancel command.
> 
> I think it might be related to the custom source, because the jobmanager 
> keeps trying to trigger a checkpoint for it, 
> but in fact it’s already canceled. The source implementation is using a 
> running flag to denote it’s running, and the 
> cancel method is simply setting the flag to false, which I think is a common 
> way of implementing a custom source.
> In addition, the cluster finally shut down because I killed it with yarn 
> commands.
> 
> And also thank you for the pointer, I’ll keep tracking this problem.
> 
> Best,
> Paul Lam
> 
> 
> 
>> 在 2018年11月10日,02:10,Gary Yao  写道:
>> 
>> Hi Paul,
>> 
>> Can you share the complete logs, or at least the logs after invoking the
>> cancel command? 
>> 
>> If you want to debug it yourself, check if
>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how 
>> the
>> jobTerminationFuture is used.
>> 
>> Best,
>> Gary
>> 
>> [1] 
>> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>> 
>> 
>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:
>> Hi, 
>> 
>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN 
>> cluster doesn’t shut down after the job is canceled successfully. The only 
>> errors I found in jobmanager’s log are as below (the second one appears 
>> multiple times):
>> 
>> ```
>> 2018-11-07 09:48:38,663 WARN  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
>> notifying JobStatusListener
>> java.lang.IllegalStateException: Incremented the completed number of 
>> checkpoints without incrementing the in progress checkpoints before.
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>  at 
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>  at 

Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Ufuk Celebi
I think there should be a log message, but I don't know what the exact
format is (you would need to look through it and search for something
related to CompletedCheckpointStore).

An alternative is the web UI checkpointing tab. It shows the latest
checkpoint used for restore of the job. You should see your savepoint
there.

Best,

Ufuk

On Sun, Nov 11, 2018 at 7:45 PM Hao Sun  wrote:
>
> This is great, I will try option 3 and let you know.
> Can I log some message so I know job is recovered from the latest savepoint?
>
> On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:
>>
>> Hey Hao and Paul,
>>
>> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
>> - As Paul pointed out, this is problematic as the node is a serialized
>> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
>> directory and not a path [1].
>> - I would not recommend this approach at the moment
>>
>> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
>> but cumbersome)
>> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
>> checkpoint statistics about the latest savepoint
>> - The latest savepoint is available as a separate entry under
>> `latest.savepoint` (If I'm reading the docs [2] correctly)
>> - You would need to manually do this before shutting down (requires
>> custom tooling to automate)
>>
>> 3) Use cancelWithSavepoint
>> - If you keep `high-availability.cluster-id` consistent between
>> executions of your job cluster, using cancelWithSavepoint [3] should
>> add the the savepoint to ZK before cancelling the job
>> - On the next execution of your job cluster, Flink should
>> automatically pick it up (no need to attach a savepoint restore path
>> manually)
>>
>> I've not tried 3) myself yet, but believe it should work. If you have
>> time to try it out, I'd be happy to hear whether it works as expected
>> for you.
>>
>> – Ufuk
>>
>> [1] I believe this is overly complicated and should be simplified in the 
>> future.
>> [2] Search /jobs/:jobid/checkpoints in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
>>
>> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
>> >
>> > Can we add an option to allow job cluster mode to start from the latest 
>> > save point? Otherwise I have to somehow get the info from ZK, before job 
>> > cluster's container started by K8s.
>> >
>> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
>> >>
>> >> Hi Hao,
>> >>
>> >> The savepoint path is stored in ZK, but it’s in binary format, so in 
>> >> order to retrieve the path you have to deserialize it back to some Flink 
>> >> internal object.
>> >>
>> >> A better approach would be using REST api to get the path. You could find 
>> >> it here[1].
>> >>
>> >> [1] 
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >>
>> >> 在 2018年11月9日,13:55,Hao Sun  写道:
>> >>
>> >> Since this save point path is very useful to application updates, where 
>> >> is this information stored? Can we keep it in ZK or S3 for retrieval?
>> >>
>> >> 
>> >>
>> >>


Re: java.io.IOException: NSS is already initialized

2018-11-11 Thread Ufuk Celebi
Hey Hao,

1) Regarding Hadoop S3: are you using the repackaged Hadoop S3
dependency from the /opt folder of the Flink distribution? Or the
actual Hadoop implementation? If latter, would you mind also running
it with the one that comes packaged with Flink? For this you can
remove all Hadoop-related configuration in your flink-conf.yaml and
copy the Hadoop S3 dependency from /opt to /lib and configure it [1].

2) Could you please share your complete Flink configuration for when
you tried to run with Presto S3? If you don't want to share this
publicly, feel free to share it privately with me. I'm curious to see
whether we can reproduce this.

– Ufuk

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
On Sat, Nov 10, 2018 at 4:07 PM Hao Sun  wrote:
>
> Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have the 
> same issue with presto.
>
> - why checkpoint is not starting from 1? old chk stored in ZK caused it, I 
> cleaned it up, but not very helpful
> - I switched to Flink + Hadoop28, and used hadoop s3, with no other changes, 
> check pointing is working with the hadoop flavour.
>
> On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:
>>
>> Hey Hao Sun,
>>
>> - Is this an intermittent failure or permanent? The logs indicate that
>> some checkpoints completed before the error occurs (e.g. checkpoint
>> numbers are greater than 1).
>>
>> - Which Java versions are you using? And which Java image? I've
>> Googled similar issues that seem to be related to the JVM, e.g. [1].
>>
>> Best,
>>
>> Ufuk
>>
>> [1] 
>> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
>>
>>
>> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
>> >
>> > Thanks, any insight/help here is appreciated.
>> >
>> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz  
>> > wrote:
>> >>
>> >> Hi Hao,
>> >>
>> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who 
>> >> were recently working with S3, maybe they will have some ideas.
>> >>
>> >> Best,
>> >>
>> >> Dawid
>> >>
>> >> On 03/11/2018 03:09, Hao Sun wrote:
>> >>
>> >> Same environment, new error.
>> >>
>> >> I can run the same docker image with my local Mac, but on K8S, this gives 
>> >> me this error.
>> >> I can not think of any difference between local Docker and K8S Docker.
>> >>
>> >> Any hint will be helpful. Thanks
>> >>
>> >> 
>> >>
>> >> 2018-11-02 23:29:32,981 INFO 
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 
>> >> ConnectedStreams maxwell.accounts () 
>> >> switched from state RUNNING to FAILING.
>> >> AsynchronousException{java.lang.Exception: Could not materialize 
>> >> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) -> 
>> >> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> >> FixedDelayWatermark(maxwell.accounts) -> 
>> >> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> >> influxdbSink(maxwell.accounts) (1/1).}
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >> at 
>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> at 
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> at java.lang.Thread.run(Thread.java:748)
>> >> Caused by: java.lang.Exception: Could not materialize checkpoint 235 for 
>> >> operator Source: KafkaSource(maxwell.accounts) -> 
>> >> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> >> FixedDelayWatermark(maxwell.accounts) -> 
>> >> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> >> influxdbSink(maxwel

Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Ufuk Celebi
Hey Hao and Paul,

1) Fetch checkpoint info manually from ZK (problematic, not recommended)
- As Paul pointed out, this is problematic as the node is a serialized
pointer (StateHandle) to a CompletedCheckpoint in the HA storage
directory and not a path [1].
- I would not recommend this approach at the moment

2) Using the HTTP API to fetch the latest savepoint pointer (possible,
but cumbersome)
- As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
checkpoint statistics about the latest savepoint
- The latest savepoint is available as a separate entry under
`latest.savepoint` (If I'm reading the docs [2] correctly)
- You would need to manually do this before shutting down (requires
custom tooling to automate)

3) Use cancelWithSavepoint
- If you keep `high-availability.cluster-id` consistent between
executions of your job cluster, using cancelWithSavepoint [3] should
add the the savepoint to ZK before cancelling the job
- On the next execution of your job cluster, Flink should
automatically pick it up (no need to attach a savepoint restore path
manually)

I've not tried 3) myself yet, but believe it should work. If you have
time to try it out, I'd be happy to hear whether it works as expected
for you.

– Ufuk

[1] I believe this is overly complicated and should be simplified in the future.
[2] Search /jobs/:jobid/checkpoints in
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
>
> Can we add an option to allow job cluster mode to start from the latest save 
> point? Otherwise I have to somehow get the info from ZK, before job cluster's 
> container started by K8s.
>
> On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
>>
>> Hi Hao,
>>
>> The savepoint path is stored in ZK, but it’s in binary format, so in order 
>> to retrieve the path you have to deserialize it back to some Flink internal 
>> object.
>>
>> A better approach would be using REST api to get the path. You could find it 
>> here[1].
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>>
>> Best,
>> Paul Lam
>>
>>
>> 在 2018年11月9日,13:55,Hao Sun  写道:
>>
>> Since this save point path is very useful to application updates, where is 
>> this information stored? Can we keep it in ZK or S3 for retrieval?
>>
>> 
>>
>>


Re: java.io.IOException: NSS is already initialized

2018-11-09 Thread Ufuk Celebi
Hey Hao Sun,

- Is this an intermittent failure or permanent? The logs indicate that
some checkpoints completed before the error occurs (e.g. checkpoint
numbers are greater than 1).

- Which Java versions are you using? And which Java image? I've
Googled similar issues that seem to be related to the JVM, e.g. [1].

Best,

Ufuk

[1] 
https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972

On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
>
> Thanks, any insight/help here is appreciated.
>
> On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz  
> wrote:
>>
>> Hi Hao,
>>
>> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who were 
>> recently working with S3, maybe they will have some ideas.
>>
>> Best,
>>
>> Dawid
>>
>> On 03/11/2018 03:09, Hao Sun wrote:
>>
>> Same environment, new error.
>>
>> I can run the same docker image with my local Mac, but on K8S, this gives me 
>> this error.
>> I can not think of any difference between local Docker and K8S Docker.
>>
>> Any hint will be helpful. Thanks
>>
>> 
>>
>> 2018-11-02 23:29:32,981 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
>> ConnectedStreams maxwell.accounts () 
>> switched from state RUNNING to FAILING.
>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>> 235 for operator Source: KafkaSource(maxwell.accounts) -> 
>> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> FixedDelayWatermark(maxwell.accounts) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> influxdbSink(maxwell.accounts) (1/1).}
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 235 for 
>> operator Source: KafkaSource(maxwell.accounts) -> 
>> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> FixedDelayWatermark(maxwell.accounts) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> influxdbSink(maxwell.accounts) (1/1).
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: 
>> java.lang.NoClassDefFoundError: Could not initialize class 
>> sun.security.ssl.SSLSessionImpl
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>
>> at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(http://OperatorSnapshotFinalizer.java:53)
>>
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> ... 5 more
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
>> sun.security.ssl.SSLSessionImpl
>> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
>>
>> at sun.security.ssl.SSLSocketImpl.(http://SSLSocketImpl.java:572)
>>
>> at 
>> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
>> at 
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> 

Re: Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-04 Thread Ufuk Celebi
Hey Hao Sun,

this has been changed recently [1] in order to properly support
failover in job cluster mode.

A workaround for you would be to add an application identifier to the
checkpoint path of each application, resulting in S3 paths like
application-/00...00/chk-64.

Is that a feasible solution?

As a side note: It was considered to keep the job ID fixed, but make
it configurable (e.g. by providing a --job-id argument) which would
also help to avoid this situation, but I'm not aware of any concrete
plans to move forward with that approach.

Best,

Ufuk

[1] https://issues.apache.org/jira/projects/FLINK/issues/FLINK-10291
On Sun, Nov 4, 2018 at 3:39 AM Hao Sun  wrote:
>
> I am wondering if I can customize job_id for job cluster mode. Currently it 
> is always . I am running multiple job 
> clusters and sharing s3, it means checkpoints will be shared by different 
> jobs as well e.g. /chk-64, how can I avoid 
> this?
>
> Thanks


Re: Default zookeeper

2018-05-14 Thread Ufuk Celebi
No, there is no difference if the version in your distro is part of
the ZooKeeper 3.4.x series. The script is there for convenience during
local testing/dev.

– Ufuk


On Sun, May 13, 2018 at 3:49 PM, miki haiat  wrote:
> When downloading the the flink source in order to run it local thire is a
> zookeper script and start-zookeeper-quorum script .
>
> Is thire any difference between the default zookeeper installation lets say
> in Ubuntu and the zookeeper that come with flink ?
>
> thanks,
>
> MIki
>


Re: PartitionNotFoundException after deployment

2018-05-04 Thread Ufuk Celebi
Hey Gyula!

I'm including Piotr and Nico (cc'd) who have worked on the network
stack in the last releases.

Registering the network structures including the intermediate results
actually happens **before** any state is restored. I'm not sure why
this reproducibly happens when you restore state. @Nico, Piotr: any
ideas here?

In general I think what happens here is the following:
- a task requests the result of a local upstream producer, but that
one has not registered its intermediate result yet
- this should result in a retry of the request with some backoff
(controlled via the config params you mention
taskmanager.network.request-backoff.max,
taskmanager.network.request-backoff.initial)

As a first step I would set logging to DEBUG and check the TM logs for
messages like "Retriggering partition request {}:{}."

You can also check the SingleInputGate code which has the logic for
retriggering requests.

– Ufuk


On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  wrote:
> Hi Ufuk,
>
> Do you have any quick idea what could cause this problems in flink 1.4.2?
> Seems like one operator takes too long to deploy and downstream tasks error
> out on partition not found. This only seems to happen when the job is
> restored from state and in fact that operator has some keyed and operator
> state as well.
>
> Deploying the same job from empty state works well. We tried increasing the
> taskmanager.network.request-backoff.max that didnt help.
>
> It would be great if you have some pointers where to look further, I havent
> seen this happening before.
>
> Thank you!
> Gyula
>
> The errror:
> org.apache.flink.runtime.io.network.partition.: Partition
> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not found.
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)



-- 
Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin

i...@data-artisans.com
+49-30-43208879

Registered at Amtsgericht Charlottenburg - HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Beam quickstart

2018-04-25 Thread Ufuk Celebi
Hey Gyula,

including Aljoscha (cc) here who is a committer at the Beam project.
Did you also ask on the Beam mailing list?

– Ufuk


On Wed, Apr 25, 2018 at 3:32 PM, Gyula Fóra  wrote:
> Hey,
> Is there somewhere an end to end guide how to run a simple beam-on-flink
> application (preferrably using Gradle)? I want to run it using the standard
> per-job yarn cluster setup but I cant seem to get it to work.
>
> I always end up having strange NoSuchMethod errors from protobuf and have
> spent hours trying to find a setup that works but I still fail.
>
> Thank you!
> Gyula


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Ufuk Celebi
It's not a stupid question at all! Try the following please:
1) Use something like Chrome's Developer Tools to check the responses
you get from the web UI. If you see an error there, that should point
you to what's going on.
2) Enable DEBUG logging for the JobManager and check the logs (if 1
doesn't help)

We just merged a change into master that does a better job of
reporting any errors that occur during submission.

– Ufuk


On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
 wrote:
> We have had 2 different installations and when e go submit a job, it gives
> us the spinning wheel and the request never reaches the JM. CLI works
> without issues. Is this us alone or some one else seen this.
>
> Again 1.3.2 does not have the issue and we have the same nodes, n/w etc so
> we sure nothing should have changed on our side.
>
> Apologies if this is a stupid question with an obvious solution.
>
> Regards
>
> Vishal


Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-07 Thread Ufuk Celebi
Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:
> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George


Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-29 Thread Ufuk Celebi
Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle methods
>> of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What I am
>> > trying to do is add an EOF symbol meaning the end of my kafka data, and
>> > when
>> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
>> > "cancel()" method. It doesn't work. Flink streaming job won't finish
>> > unless
>> > it get canceled or failed, when I use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>


Re: RichAsyncFunction in scala

2017-12-28 Thread Ufuk Celebi
Hey Antoine,

isn't it possible to use the Java RichAsyncFunction from Scala like this:

class Test extends RichAsyncFunction[Int, Int] {

  override def open(parameters: Configuration): Unit = super.open(parameters)

  override def asyncInvoke(input: Int, resultFuture:
functions.async.ResultFuture[Int]): Unit = ???
}

– Ufuk



On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
 wrote:
> Hi,
>
> It lacks a version of RichAsyncFunction class in the scala API or the
> possibility to handle a class which extends AbstractRichFunction and
> implements AsyncFunction (from the scala API).
>
> I made a small dev on our current flink fork because we need to use the open
> method to add our custom metrics from getRuntimeContext.getMetricGroup
> method.
> https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
>
> Do you already plan to release this feature soon ? Do you want me to create
> a new Jira ticket, propose a pull request ?
>
> Antoine


Re: keyby() issue

2017-12-28 Thread Ufuk Celebi
Hey Jinhua,

On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo  wrote:
> The keyby() upon the field would generate unique key as the field
> value, so if the number of the uniqueness is huge, flink would have
> trouble both on cpu and memory. Is it considered in the design of
> flink?

Yes, keyBy hash partitions the data across the nodes of your Flink
application and thus you can easily scale your application up if you
need more processing power.

I'm not sure that this is the problem in your case though. Can you
provide some more details what you are doing exactly? Are you
aggregating by time (for the keyBy you mention no windowing, but then
you mention windowAll)? What kind of aggregation are you doing? If
possible, feel free to share some code.

> Since windowsAll() could be set parallelism, so I try to use key
> selector to use field hash but not value, that I hope it would
> decrease the number of the keys, but the flink throws key out-of-range
> exception. How to use key selector in correct way?

Can you paste the exact Exception you use? I think this might indicate
that you don't correctly extract the key from your record, e.g. you
extract a different key on sender and receiver.

I'm sure we can figure this out after you provide more context. :-)

– Ufuk


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Ufuk Celebi
On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun  wrote:
> Thanks! Great to know I do not have to worry duplicates inside Flink.
>
> One more question, why this happens? Because TM and JM both check leadership
> in different interval?

Yes, it's not deterministic how this happens. There will also be cases
when the JM notices before the TM.


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Ufuk Celebi
On Wed, Dec 27, 2017 at 4:41 PM, Hao Sun  wrote:

> Somehow TM detected JM leadership loss from ZK and self disconnected?
> And couple of seconds later, JM failed to connect to ZK?
>

Yes, exactly as you describe. The TM noticed the loss of leadership before
the JM did.


> After all the cluster recovered nicely by its own, but I am wondering does
> this break the exactly-once semantics? If yes, what should I take care?
>

Great :-) It does not break exactly-once guarantees *within* the Flink
pipeline as the state of the latest completed checkpoint will be restored
after recovery. This rewinds your job and might result in duplicate or
changed output if you don't use an exactly once or idempotent sink.

– Ufuk


Re: MergingWindow

2017-12-27 Thread Ufuk Celebi
Please check your email before sending it the next time as three
emails for the same message is a little spammy ;-)

This is internal code that is used to implement session windows as far
as I can tell. The idea is to not merge the new window as it never had
any state associated with it. The general idea of merging windows is
to keep one of the original windows as the state window, i.e. the
window that is used as namespace to store the window elements.
Elements from the state windows of merged windows must be merged into
this one state window.

For more details, this should be directed to the dev mailing list.

– Ufuk

On Tue, Dec 26, 2017 at 4:58 AM, aitozi  wrote:
> Hi,
>
> i cant unserstand usage of this snippest of the code in
> MergingWindowSet.java, can anyone explain this for me ?
>
>
> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> mergeFunction.merge(mergeResult,
> mergedWindows,
> this.mapping.get(mergeResult),
> mergedStateWindows);
> }
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Apache Flink - broadcasting DataStream

2017-12-27 Thread Ufuk Celebi
Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
 +-> B2
 +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh  wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk


Re: flink yarn-cluster run job --files

2017-12-27 Thread Ufuk Celebi
The file URL needs to be accessible from all nodes, e.g. something
like S3://... or hdfs://...

>From the CLI:

```
Adds a URL to each user code classloader  on all nodes in the cluster.
The paths must specify a protocol (e.g. file://) and be accessible on
all nodes (e.g. by means of a NFS share). You can use this option
multiple times for specifying more than one URL. The protocol must be
supported by the {@link java.net.URLClassLoader}.
```

Is this the case?

I don't know whether this would work to access any file you provide though...



On Mon, Dec 25, 2017 at 2:13 PM,   wrote:
> Hi,all
>
> in spark,the submit job can have --files,this means" Comma-separated list of
> files to be placed in the working directory of each executor."
>
> so,in flink,if there have the same method,i use --classpath file:///,but
> the job run error,there has not the file.
>


Re: Fetching TaskManager log failed

2017-12-27 Thread Ufuk Celebi
Thanks for reporting this issue. A few questions:

- Which version of Flink are you using?
- Does it work up to the point that the Exception is thrown? e.g. for
smaller files it's OK?

Let me pull in Chesnay (cc'd) who has worked on log fetching for the
web runtime.

– Ufuk


On Tue, Dec 26, 2017 at 8:50 AM,   wrote:
> I run a flink job,when run one hour,there have a error:
>
>  ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  -
> Fetching TaskManager log failed.
> java.util.NoSuchElementException: None.get
>


Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-27 Thread Ufuk Celebi
Hey Jaxon,

I don't think it's possible to control this via the life-cycle methods
of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu  wrote:
> I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
> But I find it won't be close when I invoke "cancel()" method. What I am
> trying to do is add an EOF symbol meaning the end of my kafka data, and when
> the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> "cancel()" method. It doesn't work. Flink streaming job won't finish unless
> it get canceled or failed, when I use kafka as source.
>
> Somebody knowing  gives me some help, thx~~


Re: Flink network access control documentation

2017-12-27 Thread Ufuk Celebi
Hey Elias,

thanks for opening a ticket (for reference:
https://issues.apache.org/jira/browse/FLINK-8311). I fully agree with
adding docs for this. I will try to write something down this week.

Your point about JobManagers only coordinating via ZK is correct
though. I had a look into the JobManager code (as of 1.4) and the
leader election service only reads and writes leader information into
ZK which is then picked up by the TaskManagers.

What you are seeing here is related to the web UI which is attached to
every JM. The UI tries to connect to the leading JM in order to access
runtime information of the leading JM. This is not documented anywhere
as far as I can tell and might have changed between 1.3 and 1.4. The
port should not be critical to the functioning of your Flink cluster,
but only for accessing the web UI on a non-leading JM.

– Ufuk


On Fri, Dec 22, 2017 at 8:36 PM, Elias Levy  wrote:
> There is a need for better documentation on what connects to what over which
> ports in a Flink cluster to allow users to configure network access control
> rules.
>
> I was under the impression that in a ZK HA configuration the Job Managers
> were essentially independent and only coordinated via ZK.  But starting
> multiple JMs in HA with the JM RPC port blocked between JMs shows that the
> second JM's Akka subsystem is trying to connect to the leading JM:
>
> INFO  akka.remote.transport.ProtocolStateActor  - No
> response from remote for outbound association. Associate timed out after
> [2 ms].
> WARN  akka.remote.ReliableDeliverySupervisor-
> Association with remote system [akka.tcp://flink@10.210.210.127:6123] has
> failed, address is now gated for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote
> for outbound association. Associate timed out after [2 ms].]
> WARN  akka.remote.transport.netty.NettyTransport- Remote
> connection to [null] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
> connection timed out: /10.210.210.127:6123
>


Re: state.checkpoints.dir not configured

2017-12-21 Thread Ufuk Celebi
Could you please share the complete logs of the initial failure? What
you describe in your second email should not happen ;-) If the
JobManager cannot bind to the port it should simply die and not
complain about checkpoint configuration.

– Ufuk

On Thu, Dec 21, 2017 at 1:21 PM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> I inspected the log as you suggest and found that 6123 port was used by
> another process. I free the port and restarted the job manager. Now
> everything looks fine. The error message is little misleading as the real
> cause is that 6123 is already bind but it says that state.checkpoints.dir is
> not set.
>
> Thanks
>
>
>
> On 19.12.2017 17:55, Ufuk Celebi wrote:
>>
>> When the JobManager/TaskManager are starting up they log what config
>> they are loading. Look for lines like
>>
>> "Loading configuration property: {}, {}"
>>
>> Do you find the required configuration as part of these messages?
>>
>> – Ufuk
>>
>>
>> On Tue, Dec 19, 2017 at 3:45 PM, Plamen Paskov
>> <plamen.pas...@next-stream.com> wrote:
>>>
>>> Hi,
>>> I'm trying to enable externalized checkpoints like this:
>>>
>>> env.enableCheckpointing(1000);
>>> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>>
>>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> env.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink-checkpoints-data/", true));
>>>
>>> in flink-conf.yaml i set:
>>> state.checkpoints.dir: file:///tmp/flink-checkpoints-meta/
>>>
>>> but when i run the application i get this error:
>>> java.lang.IllegalStateException: CheckpointConfig says to persist
>>> periodic
>>> checkpoints, but no checkpoint directory has been configured. You can
>>> configure configure one via key 'state.checkpoints.dir'.
>>>
>>> Any suggestions?
>>>
>>> Thanks
>
>


Re: Add new slave to running cluster?

2017-12-20 Thread Ufuk Celebi
Hey Jinhua,

- The `slaves` file is only relevant for the startup scripts. You can
add as many task managers as you like by starting them manually.
- You can check the logs of the JobManager or its web UI
(jobmanager-host:8081) to see how many TMs have registered.
- The registration time out looks more like a misconfiguration to me.
Please verify that all task managers have the same configuration. If
the configuration looks good to you, it might be a network issue.

Does this help?

– Ufuk


On Wed, Dec 20, 2017 at 8:22 AM, Jinhua Luo  wrote:
> Hi All,
>
> If I add new slave (start taskmanager on new host) which does not
> included in the conf/slaves, I see below logs conintuously printed:
> ...Trying to register at JobManager...(attempt 147,
>  timeout: 3 milliseconds)
>
> Is it normal? And does the new slave successfully added in the cluster?


Re: state.checkpoints.dir not configured

2017-12-19 Thread Ufuk Celebi
When the JobManager/TaskManager are starting up they log what config
they are loading. Look for lines like

"Loading configuration property: {}, {}"

Do you find the required configuration as part of these messages?

– Ufuk


On Tue, Dec 19, 2017 at 3:45 PM, Plamen Paskov
 wrote:
> Hi,
> I'm trying to enable externalized checkpoints like this:
>
> env.enableCheckpointing(1000);
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new
> FsStateBackend("file:///tmp/flink-checkpoints-data/", true));
>
> in flink-conf.yaml i set:
> state.checkpoints.dir: file:///tmp/flink-checkpoints-meta/
>
> but when i run the application i get this error:
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic
> checkpoints, but no checkpoint directory has been configured. You can
> configure configure one via key 'state.checkpoints.dir'.
>
> Any suggestions?
>
> Thanks


Re: consecutive stream aggregations

2017-12-18 Thread Ufuk Celebi
You can do this by first doing a keyBy userId and then emitting the
value you want to average (session length). The output of this you
feed into the aggregateFunction that does a grouping by time and emits
the average per time.

input.keyBy(user).flatMap(extractSessionLength()).timeWindowAll(time).aggregate(averageAggregate())

TimeWindowAll is a single parallelism (no parallelism) operator, but
that is fine as long as you don't have huge throughput requirements.
If that becomes a problem, we would have to think about
pre-aggregating in parallel.

Does this help?

– Ufuk


On Fri, Dec 15, 2017 at 4:56 PM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> In my case i have a lot of users with one session per user. What i'm
> thinking is to evenly distribute the users then accumulate and finally merge
> all accumulators. The problem is that i don't know how to achieve this.
>
>
>
> On 15.12.2017 17:52, Ufuk Celebi wrote:
>>
>> You can first aggregate the length per user and emit it downstream.
>> Then you do the all window and average all lengths. Does that make
>> sense?
>>
>> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
>> <plamen.pas...@next-stream.com> wrote:
>>>
>>> I think i got your point.
>>> What happens now: in order to use aggregate() i need an window but the
>>> window requires keyBy() if i want to parallelize the data. In my case it
>>> will not work because if i create keyBy("userId") then the average
>>> will be calculated per userId  but i want average across all users. What
>>> would be the solution in this case?
>>>
>>> Thanks
>>>
>>>
>>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>>>
>>>> Hey Plamen,
>>>>
>>>> I think what you are looking for is the AggregateFunction. This you
>>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>>> use case (averaging).
>>>>
>>>> – Ufuk
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>
>>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>>> <plamen.pas...@next-stream.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to calculate the running average of session length and i
>>>>> want
>>>>> to
>>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>>> trying to do it like this:
>>>>>
>>>>> package flink;
>>>>>
>>>>> import lombok.AllArgsConstructor;
>>>>> import lombok.NoArgsConstructor;
>>>>> import lombok.ToString;
>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import
>>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import
>>>>>
>>>>>
>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>>> import org.apache.flink.util.Collector;
>>>>>
>>>>> import java.sql.Timestamp;
>>>>> import java.time.Instant;
>>>>> import java.time.LocalDateTime;
>>>>> import java.util.TimeZone;
>>>>>
>>>>>
>>>>> public class StreamingJob {
>>>>>   public static void main(String[] args) throws Exception {
>>>>>   StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>>   SingleOutputStreamOperator sessions = env
>>>>>   .socketTextStream("localhost", 9000, "\n")
>>>>>   .map(new MapFunction<String, Event>() {
>>>>>   @Override
>>>>> 

Re: consecutive stream aggregations

2017-12-15 Thread Ufuk Celebi
You can first aggregate the length per user and emit it downstream.
Then you do the all window and average all lengths. Does that make
sense?

On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> I think i got your point.
> What happens now: in order to use aggregate() i need an window but the
> window requires keyBy() if i want to parallelize the data. In my case it
> will not work because if i create keyBy("userId") then the average
> will be calculated per userId  but i want average across all users. What
> would be the solution in this case?
>
> Thanks
>
>
> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>
>> Hey Plamen,
>>
>> I think what you are looking for is the AggregateFunction. This you
>> can use on keyed streams. The Javadoc [1] contains an example for your
>> use case (averaging).
>>
>> – Ufuk
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>
>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>> <plamen.pas...@next-stream.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to calculate the running average of session length and i want
>>> to
>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>> trying to do it like this:
>>>
>>> package flink;
>>>
>>> import lombok.AllArgsConstructor;
>>> import lombok.NoArgsConstructor;
>>> import lombok.ToString;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>>
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>> import org.apache.flink.util.Collector;
>>>
>>> import java.sql.Timestamp;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.util.TimeZone;
>>>
>>>
>>> public class StreamingJob {
>>>  public static void main(String[] args) throws Exception {
>>>  StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>  SingleOutputStreamOperator sessions = env
>>>  .socketTextStream("localhost", 9000, "\n")
>>>  .map(new MapFunction<String, Event>() {
>>>  @Override
>>>  public Event map(String value) throws Exception {
>>>  String[] row = value.split(",");
>>>  return new Event(Long.valueOf(row[0]), row[1],
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>  }
>>>  })
>>>  .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>>>  @Override
>>>  public long extractTimestamp(Event element) {
>>>  return element.timestamp;
>>>  }
>>>  })
>>>  .keyBy("userId", "sessionId")
>>>  .maxBy("length");
>>>
>>>
>>>  sessions
>>>  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>  @Override
>>>  public void apply(TimeWindow window, Iterable
>>> values, Collector out) throws Exception {
>>>  long sum = 0;
>>>  int count = 0;
>>>
>>>  for (Event event : values) {
>>>  sum += event.length;
>>>  count++;
>>>  }
>>>
>>>  

Re: consecutive stream aggregations

2017-12-15 Thread Ufuk Celebi
Hey Plamen,

I think what you are looking for is the AggregateFunction. This you
can use on keyed streams. The Javadoc [1] contains an example for your
use case (averaging).

– Ufuk

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java

On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
 wrote:
> Hi,
>
> I'm trying to calculate the running average of session length and i want to
> trigger the computation on a regular let's say 2 minutes interval. I'm
> trying to do it like this:
>
> package flink;
>
> import lombok.AllArgsConstructor;
> import lombok.NoArgsConstructor;
> import lombok.ToString;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
>
> import java.sql.Timestamp;
> import java.time.Instant;
> import java.time.LocalDateTime;
> import java.util.TimeZone;
>
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> SingleOutputStreamOperator sessions = env
> .socketTextStream("localhost", 9000, "\n")
> .map(new MapFunction() {
> @Override
> public Event map(String value) throws Exception {
> String[] row = value.split(",");
> return new Event(Long.valueOf(row[0]), row[1],
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
> }
> })
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
> @Override
> public long extractTimestamp(Event element) {
> return element.timestamp;
> }
> })
> .keyBy("userId", "sessionId")
> .maxBy("length");
>
>
> sessions
> .timeWindowAll(Time.seconds(60), Time.seconds(30))
> .apply(new AllWindowFunction() {
> @Override
> public void apply(TimeWindow window, Iterable
> values, Collector out) throws Exception {
> long sum = 0;
> int count = 0;
>
> for (Event event : values) {
> sum += event.length;
> count++;
> }
>
> double avg = sum / count;
> LocalDateTime windowStart =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
> TimeZone.getDefault().toZoneId());
> LocalDateTime windowEnd =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
> TimeZone.getDefault().toZoneId());
> out.collect(new Avg(avg, windowStart.toString(),
> windowEnd.toString()));
> }
> });
>
> env.execute();
> }
>
> @AllArgsConstructor
> @NoArgsConstructor
> @ToString
> public static class Avg {
> public double length;
> public String windowStart;
> public String windowEnd;
> }
>
> @AllArgsConstructor
> @NoArgsConstructor
> @ToString
> public static class Event {
> public long userId;
> public String sessionId;
> public long length;
> public long timestamp;
> }
> }
>
> First i want to extract the last session event for every user-session
> because it contains the total session length. Then i want to calculate the
> average session length based on the data from
> previous operation (based on the sessions variable).
>
> Example:
>
> 1,s1,100,2017-12-13 11:58:01
> 1,s1,150,2017-12-13 11:58:02
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> sessions variable should contain those rows:
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> but it's returning the max length row only for the corresponding event.
>
> Questions:
> - how to collect the data for all groups in sessions variable?
> - is there another way to achieve this functionality because with my
> implementation the average will be 

Re: Flink 1.4.0 keytab is unreadable

2017-12-15 Thread Ufuk Celebi
Hey 杨光,

thanks for looking into this in such a detail. Unfortunately, I'm not
sure what the expected behaviour is (whether the change in behaviour
was accidental or on purpose).

Let me pull in Gordon who has worked quite a bit on the Kerberos
related components in Flink.

@Gordon:
1) Do you know what the expected behaviour is here?
2) How can he work around this issue in 1.4?

– Ufuk

On Fri, Dec 15, 2017 at 11:34 AM, 杨光  wrote:
> Hi,
> I am using flink single-job mode on YARN to read data from a kafka
> cluster  installation configured for Kerberos. When i upgrade flink to
> 1.4.0 , the yarn application can not run normally and logs th error
> like this:
>
> Exception in thread "main" java.lang.RuntimeException:
> org.apache.flink.configuration.IllegalConfigurationException: Kerberos
> login configuration is invalid; keytab is unreadable
> at 
> org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:160)
> at 
> org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
> at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> Kerberos login configuration is invalid; keytab is unreadable
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
> at 
> org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:139
>
>
> So i add some logs for the method "SecurityConfiguration.validate()"
> and rebuild the flink  package.
>
> private void validate() {
>if (!StringUtils.isBlank(keytab)) {
>   // principal is required
>   if (StringUtils.isBlank(principal)) {
>  throw new IllegalConfigurationException("Kerberos login
> configuration is invalid; keytab requires a principal.");
>   }
>
>   // check the keytab is readable
>   File keytabFile = new File(keytab);
>
>   if (!keytabFile.exists()) {
>  throw new IllegalConfigurationException("WTF! keytabFile is
> not exist ! keytab:" + keytab);
>   }
>
>   if (!keytabFile.isFile()) {
>  throw new IllegalConfigurationException("WTF! keytabFile is
> not file ! keytab:" + keytab);
>   }
>
>   if (!keytabFile.canRead()) {
>  throw new IllegalConfigurationException("WTF! keytabFile is
> not readalbe ! keytab:" + keytab);
>   }
>
>   if (!keytabFile.exists() || !keytabFile.isFile() ||
> !keytabFile.canRead()) {
>  throw new IllegalConfigurationException("Kerberos login
> configuration is invalid; keytab is unreadable");
>   }
>}
> }
>
> After that , the yarn logs error  like  this :
> 017-12-15 17:14:36,314 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner   -
> localKeytabPath:
> /data1/yarn/nm/usercache/hadoop/appcache/application_1513310528578_0009/container_e05_1513310528578_0009_01_02/krb5.keytab
> 2017-12-15 17:14:36,315 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner   - YARN
> daemon is running as: hadoop Yarn client user obtainer: hadoop
> 2017-12-15 17:14:36,315 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner   -
> ResourceID assigned for this container:
> container_e05_1513310528578_0009_01_02
> 2017-12-15 17:14:36,321 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner   -
> Exception occurred while launching Task Manager
> org.apache.flink.configuration.IllegalConfigurationException: WTF!
> keytabFile is not exist !
> keytab:/data1/yarn/nm/usercache/hadoop/appcache/application_1513310528578_0009/container_e05_1513310528578_0009_01_01/krb5.keytab
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:140)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
> at 
> org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:139)
> at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
> at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)
>
>
> These logs tell the "keytabFile" value is different from the
> "localKeytabPath".  I searched the
> "org.apache.flink.yarn.YarnTaskManagerRunner" class source code and
> found  there are
> something different betwee 1.3.2 and 1.4.0
>
> 1.3.2
>
> //To support Yarn Secure Integration Test Scenario
> File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
>
> if (krb5Conf.exists() && krb5Conf.canRead()) {
>String krb5Path = krb5Conf.getAbsolutePath();
>LOG.info("KRB5 Conf: {}", krb5Path);
>hadoopConfiguration 

Re: Python API not working

2017-12-15 Thread Ufuk Celebi
Hey Yassine,

let me include Chesnay (cc'd) who worked on the Python API.

I'm not familiar with the API and what it expects, but try entering
`streaming` or `batch` for the mode. Chesnay probably has the details.

– Ufuk


On Fri, Dec 15, 2017 at 11:05 AM, Yassine MARZOUGUI
 wrote:
> Hi All,
>
> I'm trying to use Flink with the python API, and started with the wordcount
> exemple from the Documentation. I'm using Flink 1.4 and python 2.7.
> When running env.execute(local=True), the command doesn't execute and keeps
> waiting for input. If I hit enter again I get the following error from
> Environment.py : ValueError("Invalid mode specified: " + mode)
> Looking at the source code, it looks like there are a bunch of
> sys.stdin.readline().rstrip('\n') where an input is expected from the user.
> Any idea how to run the job? Thank you.
>
> Best,
> Yassine
>


Re: docker-flink images and CI

2017-12-15 Thread Ufuk Celebi
I agree with Patrick's (cc'd) comment from the linked issue. What I
understand from the linked issue is that Patrick will take care of the
Docker image update for 1.4 manually. Is that ok with you Patrick? :-)

Regarding the Flink release process question: I fully agree with the
idea to integrate this into the release process. As it stands the
Flink release process is pretty manual though [1]... Let me include
the release manager of the 1.4 release Aljoscha (cc'd) to get his
opinion on the issue.

In any case, I'm in favour of this. +1

– Ufuk

[1] https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

On Fri, Dec 15, 2017 at 2:31 AM, Colin Williams
 wrote:
> I created the following issue on here:
> https://github.com/docker-flink/docker-flink/issues/29 where it was
> suggested I should bring this up on the list.
>
> 1.4 has been released. Hurray!
>
> But there isn't yet a dockerfile / image for the release. Furthermore, it
> would be nice to have dockerfile / images for each RC release, so people can
> incrementally test features using docker before release.
>
> Is it possible to use CI to generate images and add Dockerfiles for each
> release?


Re: Flink State monitoring

2017-12-15 Thread Ufuk Celebi
Hey Liron,

unfortunately, there are no built-in metrics related to state. In
general, exposing the actual values as metrics is problematic, but
exposing summary statistics would be a good idea. I'm not aware of a
good work around at the moment that would
work in the general case (taking into account state restore, etc.).

Let me pull in Aljoscha (cc'd) who knows the state backend internals well.

@Aljoscha:
1) Are there any plans to expose keyed state related metrics (like
number of keys)?
2) Is there a way to work around the lack of these metrics in 1.3?

– Ufuk

On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron  wrote:
> Hi group,
>
>
>
> We are using Flink keyed state in several operators.
>
> Is there an easy was to expose the data that is stored in the state, i.e.
> the key and the values?
>
> This is needed for both monitoring as well as debugging. We would like to
> understand how many key+values are stored in each state and also to view the
> data itself.
>
> I know that there is the "Queryable state" option, but this is still in
> Beta, and doesn't really give us what we want easily.
>
>
>
>
>
> *We are using Flink 1.3.2 with Java.
>
>
>
> Thanks,
>
> Liron


Re: S3 Access in eu-central-1

2017-11-29 Thread Ufuk Celebi
Hey Dominik,

yes, we should definitely add this to the docs.

@Nico: You recently updated the Flink S3 setup docs. Would you mind
adding these hints for eu-central-1 from Steve? I think that would be
super helpful!

Best,

Ufuk

On Tue, Nov 28, 2017 at 10:00 PM, Dominik Bruhn  wrote:
> Hey Stephan, Hey Steve,
> that was the right hint, adding that open to the Java-Options fixed the
> problem. Maybe we should add this somehow to our Flink Wiki?
>
> Thanks!
>
> Dominik
>
> On 28/11/17 11:55, Stephan Ewen wrote:
>>
>> Got a pointer from Steve that this is answered on Stack Overflow here:
>> https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version
>> 
>>
>> Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no
>> footprint, compatible across Hadoop versions, and based on a later s3a and
>> AWS sdk. In that connector, it should work out of the box because it uses a
>> later AWS SDK. You can also use it with earlier Hadoop versions because
>> dependencies are relocated, so it should not cash/conflict.
>>
>>
>>
>>
>> On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen > > wrote:
>>
>> Hi!
>>
>> The endpoint config entry looks correct.
>> I was looking at this issue to see if there are pointers to anything
>> else, but it looks like the explicit endpoint entry is the most
>> important thing: https://issues.apache.org/jira/browse/HADOOP-13324
>> 
>>
>> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
>> pulling you in again - listening and learning still about the subtle
>> bits and pieces of S3).
>> @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
>> only in Hadoop 2.8?
>>
>> Best,
>> Stephan
>>
>>
>> On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn > > wrote:
>>
>> Hey,
>> can anyone give a hint? Does anyone have flink running with an
>> S3 Bucket in Frankfurt/eu-central-1 and can share his config and
>> setup?
>>
>> Thanks,
>> Dominik
>>
>> On 22. Nov 2017, at 17:52, domi...@dbruhn.de
>>  wrote:
>>
>>> Hey everyone,
>>> I'm trying since hours to get Flink 1.3.2 (downloaded for
>>> hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
>>> hosted in the eu-central-1 region. Everything works fine for
>>> other regions. I'm running my job on a JobTracker in local
>>> mode. I googled the internet and found several hints, most of
>>> them telling that setting the `fs.s3a.endpoint` should solve
>>> it. It doesn't. I'm also sure that the core-site.xml (see
>>> below) is picked up, if I put garbage into the endpoint then I
>>> receive a hostname not found error.
>>>
>>> The exception I'm getting is:
>>> com.amazonaws.services.s3.model.AmazonS3Exception: Status
>>> Code: 400, AWS Service: Amazon S3, AWS Request ID:
>>> 432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
>>> Request, S3 Extended Request ID:
>>>
>>> 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=
>>>
>>> I read the AWS FAQ but I don't think that
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request
>>>
>>> 
>>> applies to me as I'm not running the NativeFileSystem.
>>>
>>> I suspect this is related to the v4 signing protocol which is
>>> required for S3 in Frankfurt. Could it be that the aws-sdk
>>> version is just too old? I tried to play around with it but
>>> the hadoop adapter is incompatible with newer versions.
>>>
>>> I have the following core-site.xml:
>>>
>>> 
>>> 
>>>
>>> fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
>>>
>>> fs.s3a.buffer.dir/tmp
>>>
>>> fs.s3a.access.keysomething
>>>
>>> fs.s3a.secret.keywont-tell
>>>
>>> fs.s3a.endpoints3.eu-central-1.amazonaws.com
>>> 
>>> >>
>>> Here is my lib folder with the versions of the aws-sdk and the
>>> hadoop-aws integration:
>>> -rw---1 root root   11.4M Mar 20  2014
>>> aws-java-sdk-1.7.4.jar
>>> -rw-r--r--1 1005 1006   70.0M Aug  3 12:10
>>> flink-dist_2.11-1.3.2.jar
>>> -rw-rw-r--1 1005 1006   98.3K Aug  3 12:07
>>> flink-python_2.11-1.3.2.jar
>>> -rw-r--r--1 1005 1006   34.9M Aug  3 11:58
>>> flink-shaded-hadoop2-uber-1.3.2.jar
>>> -rw---   

Re: Issue with back pressure and AsyncFunction

2017-11-10 Thread Ufuk Celebi
Hey Ken,

thanks for your message. Both your comments are correct (see inline).

On Fri, Nov 10, 2017 at 10:31 PM, Ken Krugler
 wrote:
> 1. A downstream function in the iteration was (significantly) increasing the
> number of tuples - it would get one in, and sometimes emit 100+.
>
> The output would loop back as input via the iteration.
>
> This eventually caused the network buffers to fill up, and that’s why the
> job got stuck.
>
> I had to add my own tracking/throttling in one of my custom function, to
> avoid having too many “active” tuples.
>
> So maybe something to note in documentation on iterations, if it’s not there
> already.

Yes, iterations are prone to deadlock due to the way that data is
exchanged between the sink and head nodes. There have been multiple
attempts to fix these shortcomings, but I don't know what the latest
state is. Maybe Aljoscha (CC'd) has some input...

> 2. The back pressure calculation doesn’t take into account AsyncIO

Correct, the back pressure monitoring only takes the main task thread
into account. Every operator that uses a separate thread to emit
records (like Async I/O oder Kafka source) is therefore not covered by
the back pressure monitoring.

– Ufuk


Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Do you use any windowing? If yes, could you please share that code? If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
> Hi Ufuk,
>
> We don’t explicitly define any state descriptor. We only use map and filters
> operator. We thought that gc handle clearing the flink’s internal states.
> So how can we manage the memory if it is always increasing?
>
> - Ebru
>
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>
> Hey Ebru, the memory usage might be increasing as long as a job is running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
>
> What kind of stateful operations are you using? Depending on your use case,
> you have to manually call `clear()` on the state instance in order to
> release the managed state.
>
> Best,
>
> Ufuk
>
> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>>
>>
>>
>> Begin forwarded message:
>>
>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi <u...@apache.org>
>>
>> Hi Ufuk,
>>
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>> usage is always increasing over time.
>>
>>
>>
>>
>> <1.png><2.png><3.png>
>>
>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Ebru,
>>
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>
>> Hi,
>>
>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>> managers in standalone mode. Recently, we've noticed that we have memory
>> related problems. We use docker container to serve Flink cluster. We have
>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>> count
>> may be change over time. Taskmanager memory usage always increases. After
>> job cancelation this memory usage doesn't decrease. We've tried to
>> investigate the problem and we've got the task manager jvm heap snapshot.
>> According to the jam heap analysis, possible memory leak was Flink list
>> state descriptor. But we are not sure that is the cause of our memory
>> problem. How can we solve the problem?
>>
>>
>>
>
>


Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Ufuk Celebi
Hey Jordan,

yeah, that should just work. Check out the state backend configuration
here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html

– Ufuk

On Tue, Nov 7, 2017 at 11:44 AM, Jordan Kuan <jordan.k...@gmail.com> wrote:
> Dear Ufuk,
>
> Thank you for your reply.
>
> All the cluster Flink servers are able to access network drive, and it mapped 
> as drive Y in all nodes.
> Do I need to provide more information?
>
> Thanks,
> Jordan
>
>
>> On 7 Nov 2017, at 6:36 PM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> As answered by David on SO, the files need to be accessible by all
>> nodes. In your setup this seems not to be the case, therefore it won't
>> work. You need a distributed file system (e.g. NFS or HDFS) or object
>> store (e.g. S3) that is accessible from all nodes.
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan <jordan.k...@gmail.com> wrote:
>>> Dear Flink Dev Team
>>>
>>> I have encountered a problem and can't find any solution in Google.
>>> And I have created a thread in stackoverflow.com but no response.
>>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>>
>>> I would really appreciate it if you could give some suggestions to me.
>>>
>>> Thanks,
>>>
>>> Jordan
>>>
>>> On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger <rmetz...@apache.org> wrote:
>>>>
>>>> Hi,
>>>> I would suggest to send your question to the user@flink.apache.org list
>>>> (make sure to subscribe first)
>>>>
>>>> -- Forwarded message --
>>>> From: <dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org>
>>>> Date: Mon, Nov 6, 2017 at 5:23 PM
>>>> Subject: MODERATE for d...@flink.apache.org
>>>> To:
>>>> Cc:
>>>> dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org
>>>>
>>>>
>>>>
>>>> To approve:
>>>>   dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>>>> To reject:
>>>>   dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>>>> To give a reason to reject:
>>>> %%% Start comment
>>>> %%% End comment
>>>>
>>>>
>>>>
>>>> -- Forwarded message --
>>>> From: Jordan Kuan <jordan.k...@gmail.com>
>>>> To: d...@flink.apache.org
>>>> Cc:
>>>> Bcc:
>>>> Date: Tue, 7 Nov 2017 00:22:53 +0800
>>>> Subject: Flink Windows HA Issue.
>>>> Dear Flink Dev Team
>>>>
>>>> I have encountered a problem and can't find any solution in Google.
>>>> And I have created a thread in stackoverflow.com but no response.
>>>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>>>
>>>> I would really appreciate it if you could give some suggestions to me.
>>>>
>>>> Thanks,
>>>>
>>>> Jordan
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Jordan Kuan
>


Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Ufuk Celebi
As answered by David on SO, the files need to be accessible by all
nodes. In your setup this seems not to be the case, therefore it won't
work. You need a distributed file system (e.g. NFS or HDFS) or object
store (e.g. S3) that is accessible from all nodes.

– Ufuk


On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan  wrote:
> Dear Flink Dev Team
>
> I have encountered a problem and can't find any solution in Google.
> And I have created a thread in stackoverflow.com but no response.
> https://stackoverflow.com/questions/47123371/flink-windows-ha
>
> I would really appreciate it if you could give some suggestions to me.
>
> Thanks,
>
> Jordan
>
> On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:
>>
>> Hi,
>> I would suggest to send your question to the user@flink.apache.org list
>> (make sure to subscribe first)
>>
>> -- Forwarded message --
>> From: 
>> Date: Mon, Nov 6, 2017 at 5:23 PM
>> Subject: MODERATE for d...@flink.apache.org
>> To:
>> Cc:
>> dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org
>>
>>
>>
>> To approve:
>>dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>> To reject:
>>dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>> To give a reason to reject:
>> %%% Start comment
>> %%% End comment
>>
>>
>>
>> -- Forwarded message --
>> From: Jordan Kuan 
>> To: d...@flink.apache.org
>> Cc:
>> Bcc:
>> Date: Tue, 7 Nov 2017 00:22:53 +0800
>> Subject: Flink Windows HA Issue.
>> Dear Flink Dev Team
>>
>> I have encountered a problem and can't find any solution in Google.
>> And I have created a thread in stackoverflow.com but no response.
>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>
>> I would really appreciate it if you could give some suggestions to me.
>>
>> Thanks,
>>
>> Jordan
>>
>>
>>
>
>
>
> --
> Best Regards,
> Jordan Kuan


Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk


On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:
> Hi,
>
> We are using Flink 1.3.1 in production, we have one job manager and 3 task
> managers in standalone mode. Recently, we've noticed that we have memory
> related problems. We use docker container to serve Flink cluster. We have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job count
> may be change over time. Taskmanager memory usage always increases. After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?


Re: FlinkCEP behaviour with time constraints not as expected

2017-11-07 Thread Ufuk Celebi
Hey Frederico,

let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
the expected behaviour here.

Best,

Ufuk


On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
 wrote:
> Hi everyone,
>
> I wanted to ask if FlinkCEP in the following scenario is working as it
> should, or I have misunderstood its functioning.
>
> I've got a keyedstream associated with the following pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value >=100).within(Time.minutes(30))
>
> Considering a single key in the stream, for simplicity, I've got the
> following sequence of events (using EventTime on the "time" field of the
> json event):
>
> {value: 100, time: "2017-11-05 03:50:02.000"}
> {value: 100, time: "2017-11-05 03:52:02.000"}
> {value: 100, time: "2017-11-05 03:54:02.000"}
> {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> minutes from the first event
> {value: 100, time: "2017-11-05 06:00:02.000"}
>
> Now, when it comes to the select/flatselect function, I tried printing the
> content of the pattern map and what I noticed is that, for example, the
> first 2 events weren't considered in the same pattern as the map was like
> the following:
>
> {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
>
> Now, shouldn't they be in the same List, as they belong to the same
> iterative pattern, defined with the oneOrMore clause?
>
> Thank you for your insight,
> Federico D'Ambrosio


Re: PartitionNotFoundException when running in yarn-session.

2017-10-12 Thread Ufuk Celebi
Hey Niels,

Flink currently restarts the complete job if you have a restart
strategy configured:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html.

I agree that only restarting the required parts of the pipeline is an
important optimization. Flink has not implemented this (fully) yet but
it's on the agenda [1] and work has already started [2].

In this particular case, everything is just slow and we don't need the
restart at all if you give the consumer a higher max timeout.

Please report back when you have more info :-)

– Ufuk

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

[2] https://issues.apache.org/jira/browse/FLINK-4256

On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
> Hi,
>
> I'm currently doing some tests to see it this info helps.
> I was running a different high CPU task on one of the nodes outside Yarn, so
> I took that one out of the cluster to see if that helps.
>
> What I do find strange that in this kind of error scenario the entire job
> fails.
> I would have expected something similar as with 'good old' MapReduce: The
> missing task is simply resubmitted and ran again.
> Why doesn't that happen?
>
>
> Niels
>
> On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Niels,
>>
>> any update on this?
>>
>> – Ufuk
>>
>>
>> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote:
>> > Hey Niels,
>> >
>> > thanks for the detailed report. I don't think that it is related to
>> > the Hadoop or Scala version. I think the following happens:
>> >
>> > - Occasionally, one of your tasks seems to be extremely slow in
>> > registering its produced intermediate result (the data shuffled
>> > between TaskManagers)
>> > - Another task is already requesting to consume data from this task
>> > but cannot find it (after multiple retries) and it fails the complete
>> > job (your stack trace)
>> >
>> > That happens only occasionally probably due to load in your cluster.
>> > The slow down could have multiple reasons...
>> > - Is your Hadoop cluster resource constrained and the tasks are slow to
>> > deploy?
>> > - Is your application JAR very large and needs a lot of time
>> > downloading?
>> >
>> > We have two options at this point:
>> > 1) You can increase the maximum retries via the config option:
>> > "taskmanager.network.request-backoff.max" The default is 1
>> > (milliseconds) and specifies what the maximum request back off is [1].
>> > Increasing this to 3 would give you two extra retries with pretty
>> > long delays (see [1]).
>> >
>> > 2) To be sure that this is really what is happening we could increase
>> > the log level of certain classes and check whether they have
>> > registered their results or not. If you want to do this, I'm more than
>> > happy to provide you with some classes to enable DEBUG logging for.
>> >
>> > What do you think?
>> >
>> > – Ufuk
>> >
>> > DETAILS
>> > ===
>> >
>> > - The TaskManagers produce and consume intermediate results
>> > - When a TaskManager wants to consume a result, it directly queries
>> > the producing TaskManager for it
>> > - An intermediate result becomes ready for consumption during initial
>> > task setup (state DEPLOYING)
>> > - When a TaskManager is slow to register its intermediate result and
>> > the consumer requests the result before it is ready, it can happen
>> > that a requested partition is "not found"
>> >
>> > This is what is also happening here. We retry to request the
>> > intermediate result multiple times with timed backoff [1] and only
>> > fail the request (your stack trace) if the partition is still not
>> > ready although we expect it to be ready (that is there was no failure
>> > at the producing task).
>> >
>> > [1] Starting by default at 100 millis and going up to 10_000 millis by
>> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)
>> >
>> >
>> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> >> Hi,
>> >>
>> >> I'm having some trouble running a java based Flink job in a
>> >> yarn-session.
>> &g

Re: Flink 1.3.2 Netty Exception

2017-10-11 Thread Ufuk Celebi
@Chesnay: Recycling of network resources happens after the tasks go
into state FINISHED. Since we are submitting new jobs in a local loop
here it can easily happen that the new job is submitted before enough
buffers are available again. At least, previously that was the case.

I'm CC'ing Nico who refactored the network buffer distribution
recently and who might have more details about this specific error
message.

@Nico: another question is why there seem to be more buffers available
but we don't assign them. I'm referring to this part of the error
message "5691 of 32768 bytes...".

On Wed, Oct 11, 2017 at 2:54 PM, Chesnay Schepler  wrote:
> I can confirm that the issue is reproducible with the given test, from the
> command-line and IDE.
>
> While cutting down the test case, by replacing the outputformat with a
> DiscardingOutputFormat and the JDBCInputFormat with a simple collection, i
> stumbled onto a new Exception after ~200 iterations:
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> Caused by: java.io.IOException: Insufficient number of network buffers:
> required 4, but only 1 available. The total number of network buffers is
> currently set to 5691 of 32768 bytes each. You can increase this number by
> setting the configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>   at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:195)
>   at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:186)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:602)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> On 11.10.2017 12:48, Flavio Pompermaier wrote:
>
> Hi to all,
> we wrote a small JUnit test to reproduce a memory issue we have in a Flink
> job (that seems related to Netty) . At some point, usually around the 28th
> loop, the job fails with the following exception (actually we never faced
> that in production but maybe is related to the memory issue somehow...):
>
> Caused by: java.lang.IllegalAccessError:
> org/apache/flink/runtime/io/network/netty/NettyMessage
> at
> io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
> at
> io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
> ... 16 more
>
> The github project is https://github.com/okkam-it/flink-memory-leak and the
> JUnit test is contained in the MemoryLeakTest class (within src/main/test).
>
> Thanks in advance for any support,
> Flavio
>
>


Re: PartitionNotFoundException when running in yarn-session.

2017-10-11 Thread Ufuk Celebi
Hey Niels,

any update on this?

– Ufuk


On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Niels,
>
> thanks for the detailed report. I don't think that it is related to
> the Hadoop or Scala version. I think the following happens:
>
> - Occasionally, one of your tasks seems to be extremely slow in
> registering its produced intermediate result (the data shuffled
> between TaskManagers)
> - Another task is already requesting to consume data from this task
> but cannot find it (after multiple retries) and it fails the complete
> job (your stack trace)
>
> That happens only occasionally probably due to load in your cluster.
> The slow down could have multiple reasons...
> - Is your Hadoop cluster resource constrained and the tasks are slow to 
> deploy?
> - Is your application JAR very large and needs a lot of time downloading?
>
> We have two options at this point:
> 1) You can increase the maximum retries via the config option:
> "taskmanager.network.request-backoff.max" The default is 1
> (milliseconds) and specifies what the maximum request back off is [1].
> Increasing this to 3 would give you two extra retries with pretty
> long delays (see [1]).
>
> 2) To be sure that this is really what is happening we could increase
> the log level of certain classes and check whether they have
> registered their results or not. If you want to do this, I'm more than
> happy to provide you with some classes to enable DEBUG logging for.
>
> What do you think?
>
> – Ufuk
>
> DETAILS
> ===
>
> - The TaskManagers produce and consume intermediate results
> - When a TaskManager wants to consume a result, it directly queries
> the producing TaskManager for it
> - An intermediate result becomes ready for consumption during initial
> task setup (state DEPLOYING)
> - When a TaskManager is slow to register its intermediate result and
> the consumer requests the result before it is ready, it can happen
> that a requested partition is "not found"
>
> This is what is also happening here. We retry to request the
> intermediate result multiple times with timed backoff [1] and only
> fail the request (your stack trace) if the partition is still not
> ready although we expect it to be ready (that is there was no failure
> at the producing task).
>
> [1] Starting by default at 100 millis and going up to 10_000 millis by
> doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)
>
>
> On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> Hi,
>>
>> I'm having some trouble running a java based Flink job in a yarn-session.
>>
>> The job itself consists of reading a set of files resulting in a DataStream
>> (I use DataStream because in the future I intend to change the file with a
>> Kafka feed), then does some parsing and eventually writes the data into
>> HBase.
>>
>> Most of the time running this works fine yet sometimes it fails with this
>> exception:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> not found.
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>   at
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>   at
>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>   at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecuto

Re: PartitionNotFoundException when running in yarn-session.

2017-10-09 Thread Ufuk Celebi
Hey Niels,

thanks for the detailed report. I don't think that it is related to
the Hadoop or Scala version. I think the following happens:

- Occasionally, one of your tasks seems to be extremely slow in
registering its produced intermediate result (the data shuffled
between TaskManagers)
- Another task is already requesting to consume data from this task
but cannot find it (after multiple retries) and it fails the complete
job (your stack trace)

That happens only occasionally probably due to load in your cluster.
The slow down could have multiple reasons...
- Is your Hadoop cluster resource constrained and the tasks are slow to deploy?
- Is your application JAR very large and needs a lot of time downloading?

We have two options at this point:
1) You can increase the maximum retries via the config option:
"taskmanager.network.request-backoff.max" The default is 1
(milliseconds) and specifies what the maximum request back off is [1].
Increasing this to 3 would give you two extra retries with pretty
long delays (see [1]).

2) To be sure that this is really what is happening we could increase
the log level of certain classes and check whether they have
registered their results or not. If you want to do this, I'm more than
happy to provide you with some classes to enable DEBUG logging for.

What do you think?

– Ufuk

DETAILS
===

- The TaskManagers produce and consume intermediate results
- When a TaskManager wants to consume a result, it directly queries
the producing TaskManager for it
- An intermediate result becomes ready for consumption during initial
task setup (state DEPLOYING)
- When a TaskManager is slow to register its intermediate result and
the consumer requests the result before it is ready, it can happen
that a requested partition is "not found"

This is what is also happening here. We retry to request the
intermediate result multiple times with timed backoff [1] and only
fail the request (your stack trace) if the partition is still not
ready although we expect it to be ready (that is there was no failure
at the producing task).

[1] Starting by default at 100 millis and going up to 10_000 millis by
doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)


On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes  wrote:
> Hi,
>
> I'm having some trouble running a java based Flink job in a yarn-session.
>
> The job itself consists of reading a set of files resulting in a DataStream
> (I use DataStream because in the future I intend to change the file with a
> Kafka feed), then does some parsing and eventually writes the data into
> HBase.
>
> Most of the time running this works fine yet sometimes it fails with this
> exception:
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
> not found.
>   at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>   at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>   at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>   at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>   at
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> 

Re: question on sideoutput from ProcessWindow function

2017-09-23 Thread Ufuk Celebi
+1

Created an issue here: https://issues.apache.org/jira/browse/FLINK-7677


On Thu, Sep 14, 2017 at 11:51 AM, Aljoscha Krettek  wrote:
> Hi,
>
> Chen is correct! I think it would be nice, though, to also add that
> functionality for ProcessWindowFunction and I think this should be easy to
> do since the interface is very similar to ProcessFunction and we could also
> add that to the Context.
>
> Best,
> Aljoscha
>
> On 9. Sep 2017, at 06:22, Chen Qin  wrote:
>
> Hi Prabhu,
>
> That is good question, the short answer is not yet, only ProcessFunction was
> given flexibility of doing customized sideoutput at the moment.
> Window Function wasn't given such flexibility partially due to sideoutput
> initially targeting late arriving event for window use cases.
>
> +@Aljoscha might have better picture on this question.
>
> Thanks,
> Chen
>
> On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V  wrote:
>>
>> Hi,
>>
>> Can we have a side output from a process window function ?
>>
>> I am currently genrating a side output from a process function. The main
>> output of the process function is then Windowed and a ProcessWindowFunction
>> is applied on the windows.
>>
>> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
>> unable to find any api that enables this functionality.
>>
>> Thanks,
>> Prabhu
>
>
>


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Ufuk Celebi
I saw this too recently when using HadoopFileSystem for checkpoints
(HDFS or S3). I thought I had opened an issue for this, but I didn't.
Here it is: https://issues.apache.org/jira/browse/FLINK-7643


On Tue, Sep 19, 2017 at 1:28 PM, Till Rohrmann  wrote:
> Hi Elias,
>
> which version of Flink and which state backend are you running? I tried to
> reproduce it and wasn't successful so far.
>
> We recently changed a bit how we load the GlobalConfiguration in combination
> with dynamic properties [1]. Maybe this has affected what you've reported as
> well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-7269
>
> Cheers,
> Till
>
> On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy 
> wrote:
>>
>> Is there a particular reason that GlobalConfiguration is so noisy?
>>
>> The task manager log is full of "Loading configuration property" messages
>> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
>> configuration read once?
>
>


Re: Flink flick cancel vs stop

2017-09-14 Thread Ufuk Celebi
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy  wrote:
> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy 
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>


Re: Flink 1.2.1 JobManager Election Deadlock

2017-09-07 Thread Ufuk Celebi
Thanks for looking into this and finding out that it is (probably)
related to Curator. Very valuable information!

On Thu, Sep 7, 2017 at 3:09 PM, Timo Walther  wrote:
> Thanks for informing us. As far as I know, we were not aware of any deadlock
> in the JobManager election. Let's hope that the updated Curator version
> fixed the problem. We will defenitely keep an eye on this. Feel free to
> contact the dev@ mailing list, if the problem still exists in 1.3.2.
>
> Regards,
> Timo
>
> Am 06.09.17 um 22:58 schrieb James Bucher:
>
> It seems I forgot to attach the full logs. They should be attached now.
>
> From: James Bucher 
> Date: Wednesday, September 6, 2017 at 10:43 AM
> To: "user@flink.apache.org" 
> Subject: Flink 1.2.1 JobManager Election Deadlock
>
> Hey all,
>
> Just wanted to report this for posterity in case someone else sees something
> similar. We were running Flink 1.2.1 in Kubernetes. We use an HA setup with
> an external Zookeeper and S3 for checkpointing. We recently noticed a job
> that appears to have deadlocked on JobManager Leader election. We think the
> issue happened something like the following:
>
> Job was up and running normally
> Some cluster event caused the JobManager Pod (process) to get restarted.
> JobManager came up again but got stuck on LeaderElection. At this time the
> JobManager UI sent back a response with "Service temporarily unavailable due
> to an ongoing leader election. Please refresh."
> JobManager never exited this state.
> This state persisted across Pod restarts/deletes.
>
> In order to try to pin down this problem we brought up the Job in another
> Flink Cluster and started debugging this issue. As a first step I upped the
> logging level on the JobManager and applied the change. This resulted in the
> following log (Full logs attached to this email):
>
> 2017-09-05 18:50:55,072 TRACE
> org.apache.flink.shaded.org.apache.curator.utils.DefaultTracerDriver  -
> Trace: GetDataBuilderImpl-Background - 4 ms
>
> 2017-09-05 18:50:55,075 DEBUG
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Received CHILD_ADDED event (path: /4e4cdc8fb8c1437c620cd4063bd265e1)
>
> 2017-09-05 18:50:55,088 DEBUG
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Received CHILD_ADDED event notification for job
> 4e4cdc8fb8c1437c620cd4063bd265e1
>
> 2017-09-05 18:50:56,072 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 1,3  replyHeader:: 1,21476396435,0
> request:: '/traveler-profile.us-west-2c.test.expedia.com,F  response::
> s{4305881524,4305881524,1497456679255,1497456679255,0,3,0,0,0,3,21475572278}
>
> 2017-09-05 18:50:56,078 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 2,3  replyHeader:: 2,21476396435,0
> request:: '/traveler-profile.us-west-2c.test.expedia.com/krazyglue,F
> response::
> s{4305881525,4305881525,1497456679260,1497456679260,0,1,0,0,0,1,4305881526}
>
> 2017-09-05 18:50:56,079 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 3,3  replyHeader:: 3,21476396435,0
> request:: '/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip,F
> response::
> s{4305881526,4305881526,1497456679263,1497456679263,0,1,0,0,0,1,4305881527}
>
> 2017-09-05 18:50:56,080 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 4,3  replyHeader:: 4,21476396435,0
> request::
> '/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip/flink,F
> response::
> s{4305881527,4305881527,1497456679267,1497456679267,0,1,0,0,0,1,4305881528}
>
> 2017-09-05 18:50:56,081 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 5,3  replyHeader:: 5,21476396435,0
> request::
> '/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip/flink/default,F
> response::
> s{4305881528,4305881528,1497456679270,1497456679270,0,27,0,0,0,5,21475449005}
>
> 2017-09-05 18:50:56,085 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService.
>
> 2017-09-05 18:50:56,087 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> serverPath:null finished:false header:: 6,3  replyHeader:: 6,21476396435,0
> request:: '/traveler-profile.us-west-2c.test.expedia.com,F  response::
> s{4305881524,4305881524,1497456679255,1497456679255,0,3,0,0,0,3,21475572278}
>
> 2017-09-05 18:50:56,087 DEBUG org.apache.zookeeper.ClientCnxn
> - Reading reply sessionid:0x15e3df8b5a57b4b, packet:: clientPath:null
> 

Re: Flink HA with Kubernetes, without Zookeeper

2017-08-23 Thread Ufuk Celebi
Thanks James for sharing your experience. I find it very interesting :-)


On Tue, Aug 22, 2017 at 9:50 PM, Hao Sun  wrote:
> Great suggestions, the etcd operator is very interesting, thanks James.
>
>
> On Tue, Aug 22, 2017, 12:42 James Bucher  wrote:
>>
>> Just wanted to throw in a couple more details here from what I have
>> learned from working with Kubernetes.
>>
>> All processes restart (a lost JobManager restarts eventually). Should be
>> given in Kubernetes:
>>
>> This works very well, we run multiple jobs with a single Jobmanager and
>> Flink/Kubernetes recovers quite well.
>>
>> A way for TaskManagers to discover the restarted JobManager. Should work
>> via Kubernetes as well (restarted containers retain the external hostname):
>>
>> We use StatefulSets which provide a DNS based discovery mechanism.
>> Provided DNS is set up correctly with TTLs this works well. You could also
>> leverage the built-in Kubernetes services if you are only running a single
>> Job Manager. Kubernetes will just route the traffic to the single pod. This
>> works fine with a single Job Manager (I have tested it). However multiple
>> Job Managers won’t work because Kubernetes will route this round-robin to
>> the Job Managers
>>
>> A way to isolate different "leader sessions" against each other. Flink
>> currently uses ZooKeeper to also attach a "leader session ID" to leader
>> election, which is a fencing token to avoid that processes talk to each
>> other despite having different views on who is the leader, or whether the
>> leaser lost and re-gained leadership:
>>
>> This is probably the most difficult thing. You could leverage the built in
>> ETCD cluster. Connecting directly to the Kubernetes ETCD database directly
>> is probably a bad idea however. You should be able to create a counter using
>> the PATCH API that Kubernetes supplies in the API which follows:
>> https://tools.ietf.org/html/rfc6902 you could probably leverage
>> https://tools.ietf.org/html/rfc6902#section-4.6 to allow for atomic updates
>> to counters. Combining this with:
>> https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
>> should give a good way to work with ETCD without actually connecting
>> directly to the Kubernetes ETCD directly. This integration would require
>> modifying the Job Manager leader election code.
>>
>> A distributed atomic counter for the checkpoint ID. This is crucial to
>> ensure correctness of checkpoints in the presence of JobManager failures and
>> re-elections or split-brain situations.
>>
>> This is very similar to the above, we should be able to accomplish that
>> through the PATCH API combined with update if condition.
>>
>> If you don’t want to actually rip way into the code for the Job Manager
>> the ETCD Operator would be a good way to bring up an ETCD cluster that is
>> separate from the core Kubernetes ETCD database. Combined with zetcd you
>> could probably have that up and running quickly.
>>
>> Thanks,
>> James Bucher
>>
>> From: Hao Sun 
>> Date: Monday, August 21, 2017 at 9:45 AM
>> To: Stephan Ewen , Shannon Carey 
>> Cc: "user@flink.apache.org" 
>> Subject: Re: Flink HA with Kubernetes, without Zookeeper
>>
>> Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check
>> that out and share my results if we proceed on that path.
>> Thanks Stephan for the details, this is very useful, I was about to ask
>> what exactly is stored into zookeeper, haha.
>>
>> On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen  wrote:
>>>
>>> Hi!
>>>
>>> That is a very interesting proposition. In cases where you have a single
>>> master only, you may bet away with quite good guarantees without ZK. In
>>> fact, Flink does not store significant data in ZK at all, it only uses locks
>>> and counters.
>>>
>>> You can have a setup without ZK, provided you have the following:
>>>
>>>   - All processes restart (a lost JobManager restarts eventually). Should
>>> be given in Kubernetes.
>>>
>>>   - A way for TaskManagers to discover the restarted JobManager. Should
>>> work via Kubernetes as well (restarted containers retain the external
>>> hostname)
>>>
>>>   - A way to isolate different "leader sessions" against each other.
>>> Flink currently uses ZooKeeper to also attach a "leader session ID" to
>>> leader election, which is a fencing token to avoid that processes talk to
>>> each other despite having different views on who is the leader, or whether
>>> the leaser lost and re-gained leadership.
>>>
>>>   - An atomic marker for what is the latest completed checkpoint.
>>>
>>>   - A distributed atomic counter for the checkpoint ID. This is crucial
>>> to ensure correctness of checkpoints in the presence of JobManager failures
>>> and re-elections or split-brain situations.
>>>
>>> I would assume that etcd can provide all of those services. The best way

Re: Great number of jobs and numberOfBuffers

2017-08-17 Thread Ufuk Celebi
PS: Also pulling in Nico (CC'd) who is working on the network stack.

On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Gwenhael,
>
> the network buffers are recycled automatically after a job terminates.
> If this does not happen, it would be quite a major bug.
>
> To help debug this:
>
> - Which version of Flink are you using?
> - Does the job fail immediately after submission or later during execution?
> - Is the following correct: the batch job that eventually fails
> because of missing network buffers runs without problems if you submit
> it to a fresh cluster with the same memory
>
> The network buffers are recycled after the task managers report the
> task being finished. If you immediately submit the next batch there is
> a slight chance that the buffers are not recycled yet. As a possible
> temporary work around, could you try waiting for a short amount of
> time before submitting the next batch?
>
> I think we should also be able to run the job without splitting it up
> after increasing the network memory configuration. Did you already try
> this?
>
> Best,
>
> Ufuk
>
>
> On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> <gwenhael.pasqui...@ericsson.com> wrote:
>> Hello,
>>
>>
>>
>> We’re meeting a limit with the numberOfBuffers.
>>
>>
>>
>> In a quite complex job we do a lot of operations, with a lot of operators,
>> on a lot of folders (datehours).
>>
>>
>>
>> In order to split the job into smaller “batches” (to limit the necessary
>> “numberOfBuffers”) I’ve done a loop over the batches (handle the datehours 3
>> by 3), for each batch I create a new env then call the execute() method.
>>
>>
>>
>> However it looks like there is no cleanup : after a while, if the number of
>> batches is too big, there is an error saying that the numberOfBuffers isn’t
>> high enough. It kinds of looks like some leak. Is there a way to clean them
>> up ?


Re: Great number of jobs and numberOfBuffers

2017-08-17 Thread Ufuk Celebi
Hey Gwenhael,

the network buffers are recycled automatically after a job terminates.
If this does not happen, it would be quite a major bug.

To help debug this:

- Which version of Flink are you using?
- Does the job fail immediately after submission or later during execution?
- Is the following correct: the batch job that eventually fails
because of missing network buffers runs without problems if you submit
it to a fresh cluster with the same memory

The network buffers are recycled after the task managers report the
task being finished. If you immediately submit the next batch there is
a slight chance that the buffers are not recycled yet. As a possible
temporary work around, could you try waiting for a short amount of
time before submitting the next batch?

I think we should also be able to run the job without splitting it up
after increasing the network memory configuration. Did you already try
this?

Best,

Ufuk


On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
 wrote:
> Hello,
>
>
>
> We’re meeting a limit with the numberOfBuffers.
>
>
>
> In a quite complex job we do a lot of operations, with a lot of operators,
> on a lot of folders (datehours).
>
>
>
> In order to split the job into smaller “batches” (to limit the necessary
> “numberOfBuffers”) I’ve done a loop over the batches (handle the datehours 3
> by 3), for each batch I create a new env then call the execute() method.
>
>
>
> However it looks like there is no cleanup : after a while, if the number of
> batches is too big, there is an error saying that the numberOfBuffers isn’t
> high enough. It kinds of looks like some leak. Is there a way to clean them
> up ?


Re: Queryable State max number of clients

2017-08-14 Thread Ufuk Celebi
This is as Aljoscha describes. Each thread can handle many different
clients at the same time. You shouldn't need to change the defaults in
most cases.

The network threads handle the TCP connections and dispatch query
tasks to the query threads which do the actual querying of the state
backend. In case of the RocksDB backend for example this might involve
blocking I/O calls.

– Ufuk

On Mon, Aug 14, 2017 at 12:35 PM, Aljoscha Krettek  wrote:
> Hi,
>
> I think the number of network treads and number of query threads only
> roughly correlate with the number of clients that can query in parallel
> since this is using asynchronous communication via Akka/Netty. Of course,
> increasing that number means there can be more connections but I think even
> just 1 Thread or each of those should be able to easily handle multiple
> queries at the same time.
>
> I'm cc'ing Ufuk and Kostas who might know more about this.
>
> Best,
> Aljoscha
>
> On 9. Aug 2017, at 17:19, Ziyad Muhammed  wrote:
>
> Hi all,
>
> I'm trying to understand how many parallel clients will be supported by the
> queryable state.
>
> query.server.network-threads: number of network (event loop) threads for the
> KvStateServer (0 => #slots)
> query.server.query-threads: number of asynchronous query threads for the
> KvStateServerHandler (0 => #slots).
>
> so, if I choose 0 for both these parameters, what will be the maximum number
> of parallel clients supported?
>
> I tried more parallel clients than number of slots, but all of them were
> able to query the state in parallel. Can someone help me to understand the
> logic here?
>
> Thanks in advance.
>
> Best
> Ziyad
>
>


Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-07-28 Thread Ufuk Celebi
On Fri, Jul 28, 2017 at 4:03 PM, Stephan Ewen  wrote:
> Seems like no one raised a concern so far about dropping the savepoint
> format compatibility for 1.1 in 1.4.
>
> Leaving this thread open for some more days, but from the sentiment, it
> seems like we should go ahead?

+1


Re: mirror links don't work

2017-07-04 Thread Ufuk Celebi
Thanks for reporting this. Did you find these pages by Googling for
the Flink docs? They are definitely very outdated versions of Flink.

On Tue, Jul 4, 2017 at 4:46 PM, AndreaKinn  wrote:
> I found it clicking on "download flink for hadoop 1.2" button:
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mirror-links-don-t-work-tp14114p14116.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-07-03 Thread Ufuk Celebi
On Mon, Jul 3, 2017 at 12:02 PM, Stefan Richter
 wrote:
> Another thing that could be really helpful, if possible, can you attach a
> profiler/sampling to your job manager and figure out the hotspot methods
> where most time is spend? This would be very helpful as a starting point
> where the problem is potentially caused.

A stack trace will also be helpful to see whether some threads are stuck.

If it is possible to run on Mesos without HA mode (@Till: is that
possible?), it might be worthwhile to re-run this without HA to get a
hint whether it is related to HA mode.

– Ufuk


Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

2017-06-29 Thread Ufuk Celebi
On Thu, Jun 29, 2017 at 8:59 AM, Ufuk Celebi <u...@apache.org> wrote:
> @Vera: As a work around you could enable checkpointing and afterwards
> explicitly disable restarts via
> ExecutionConfig.setRestartStrategy(null). Then the cluster default
> should be picked up.

@Vera: Sorry, just checked Gordon's code reference and saw that my
suggestion wouldn't work there. So I don't think that there is a good
work around here...


Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

2017-06-29 Thread Ufuk Celebi
Hey Vera and Gordon,

I agree that this behaviour is confusing.

If we want to split hairs here, we wouldn't call it a bug, because the
restart strategy docs say that "Default restart strategy to use in
case no restart strategy has been specified for the job". The
confusing part is that enabling checkpoints sets a restart strategy
that overwrites the default configuration.

We would need to specify the restart strategy on the job manager,
because the client who runs the job graph generator doesn't have
access to the cluster config. If we change this, we have to think
about how to do it without breaking behaviour of existing programs.

@Vera: As a work around you could enable checkpointing and afterwards
explicitly disable restarts via
ExecutionConfig.setRestartStrategy(null). Then the cluster default
should be picked up.

– Ufuk

On Thu, Jun 29, 2017 at 8:37 AM, Tzu-Li (Gordon) Tai
 wrote:
> Hi Vera,
>
> Apparently, if there no job-specific restart strategy, an infinite
> FixedDelayRestartStrategy is always used for the job submission:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L571-L576
>
> IMO, this seems to be a bug, as the global restart strategy config should be
> respected. I’ll get back to this once I confirm this.
>
> Regards,
> Gordon
>
> On 28 June 2017 at 10:22:37 PM, Vera Coberley (veracober...@gmail.com)
> wrote:
>
> Hi all,
>
> We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a
> default restart-strategy of fixed-delay, with 3 attempts:
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 3
>
> These settings are echoed by the GlobalConfiguration (see first set of log
> statements). However, the job is submitted with a maxNumberRestartAttempts
> of Max INT instead of 3 (see second set of log statements)
>
> The job is enabled for checkpointing, and it does not have any job-specific
> restart strategy defined:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L));
>
> I assumed the default restart configuration would carry over to the job. Am
> I mistaken in my assumption, do I have a configuration error, or is this a
> bug?
>
> -- Vera
>
> 2017-06-27 19:52:11.288 [main] INFO
> org.apache.flink.configuration.GlobalConfiguration  - Loading configuration
> property: restart-strategy, fixed-delay
> 2017-06-27 19:52:11.288 [main] INFO
> org.apache.flink.configuration.GlobalConfiguration  - Loading configuration
> property: restart-strategy.fixed-delay.attempts, 3
>
> 2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO
> org.apache.flink.yarn.YarnJobManager  - Submitting job XYZ
> 2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO
> org.apache.flink.yarn.YarnJobManager  - Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=1) for XYZ


Re: UnknownKvStateKeyGroupLocation

2017-05-16 Thread Ufuk Celebi
Hey Joe! This sounds odd... are there any failures (JobManager or
TaskManager) or leader elections being reported? You should see such
events in the JobManager/TaskManager logs.

On Tue, May 16, 2017 at 2:28 PM, Joe Olson  wrote:
> When running Flink in high availability mode, I've been seeing a high number
> of UnknownKvStateKeyGroupLocation errors being returned when using queryable
> state calls.
>
>
> If I put a simple getKvState call into a loop executing every second, and
> call it repeatedly, sometimes I will get the expected results, sometimes I
> will get UnknownKvStateKeyGroupLocation thrown. This is not associated with
> a query timeout (network issue).
>
>
> From looking at the Flink source code, this problem stems from a failure of
> lookup.getKvStateServerAddress returning null. I know all the task managers
> are registering state with the job manager, because I see the "Key value
> state registered for job xx under name yy" messages in the job server log.
>
>
> Anything else I should be looking for? I have several jobs I am querying
> state on, and this seems isolated to only one. I've gone over very closely
> the difference between the jobs, but they all built from the same template.
>
>
> What would cause a lookup.getKvStateServerAddress to sometimes succeed, and
> sometimes to fail?
>
>
>


Re: Queryable State

2017-05-04 Thread Ufuk Celebi
Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
<chet.master...@yandex.com> wrote:
> Can do. Any advice on where the trace prints should go in the task manager
> source code?
>
> BTW - How do I know I have a correctly configured cluster? Is there a set of
> messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
> Thanks!
>
>
> 02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>:
>
> Hey Chet! I'm wondering why you are only seeing 2 registration
> messages for 3 task managers. Unfortunately, there is no log message
> at the task managers when they send out the notification. Is it
> possible for you to run a remote debugger with the task managers or
> build a custom Flink version with the appropriate log messages on the
> task manager side?
> – Ufuk
>
>
> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
> <chet.master...@yandex.com> wrote:
>
>
>
>  Any insight here? I've got a situation where a key value state on a task
>  manager is being registered with the job manager, but when I try to query
>  it, the job manager responds it doesn't know the location of the key value
>  state...
>
>
>  26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:
>
>  After setting the logging to DEBUG on the job manager, I learned four
>  things:
>
>  (On the message formatting below, I have the Flink logs formatted into JSON
>  so I can import them into Kibana)
>
>  1. The appropriate key value state is registered in both parallelism = 1
> and
>  parallelism = 3 environments. In parallelism = 1, I saw one registration
>  message in the log, in the parallelism = 3, I saw two registration
> messages:
>  {"level":"DEBUG","time":"2017-04-26
>
> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
>  "msg":"Key value state registered for job  under name "}
>
>  2. When I issued the query in both parallelism = 1 and parallelism = 3
>  environments, I saw "Lookup key-value state for job  with
>  registration name ". In parallelism = 1, I saw 1 log message, in
>  parallelism = 3, I saw two identical messages.
>
>  3. I saw no other messages in the job manager log that seemed relevant.
>
>  4. When issuing the query in parallelism = 3, I continued to get the error:
>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>  of null.
>
>  Thanks!
>
>
>
>
>
>  26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:
>
>  Thanks! Your config looks good to me.
>
>  Could you please set the log level org.apache.flink.runtime.jobmanager to
>  DEBUG?
>
>  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>  Then we can check whether the JobManager logs the registration of the
>  state instance with the respective name in the case of parallelism >
>  1?
>
>  Expected output is something like this: "Key value state registered
>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>  – Ufuk
>
>  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>  <chet.master...@yandex.com> wrote:
>
>   Ok...more information.
>
>   1. Built a fresh cluster from the ground up. Started testing queryable
>  state
>   at each step.
>   2. When running under any configuration of task managers and job managers
>   were parallelism = 1, the queries execute as expected.
>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
>   manager) feeding off a kafka topic partitioned three ways, queries will
>   always fail, returning error
>   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>   error message of null.
>   4. I do know my state is as expected on the cluster. Liberal use of trace
>   prints show my state managed on the jobs is as I expect. However, I cannot
>   query them external.
>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
>   is configured by using the job manager UI.
>   6. My flink-conf.yaml:
>
>   jobmanager.rpc.address: flink01
>   jobmanager.rpc.port: 6123
>   jobmanager.heap.mb: 256

Re: Multiple consumers on a subpartition

2017-04-26 Thread Ufuk Celebi
Adding to what Zhijiang said: I think the way to go would be to create
multiple "read views" over the pipelined subpartition. You would have
to make sure that the initial reference count of the partition buffers
is incremented accordingly. The producer will be back pressured by
both consumers now. This could be undesired in some scenarios.
Currently, both consumers are independent of each other by creating
multiple partitions (with their own subpartitions) for each consumer.



On Wed, Apr 26, 2017 at 5:58 AM, Zhijiang(wangzhijiang999)
 wrote:
> Hi albert,
>
> As I know, if the upstream data will be consumed by multiple consumers,
> it will generate multiple subpartitions, and each subpartition will
> correspond to one input channel consumer.
> So it is one-to-one correspondence among subpartition -> subpartition view
> -> input channel.
>
> cheers,
> zhijiang
>
> --
> 发件人:albertjonathan 
> 发送时间:2017年4月26日(星期三) 02:37
> 收件人:user 
> 主 题:Multiple consumers on a subpartition
>
> Hello,
>
> Is there a way Flink allow a (pipelined) subpartition to be consumed by
> multiple consumers? If not, would it make more sense to implement it as
> multiple input channels for a single subpartition or multiple subpartition
> views for each input channel?
>
> Any suggestion is appreciated.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-consumers-on-a-subpartition-tp12809.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>


Re: in-memory optimization

2017-04-24 Thread Ufuk Celebi
Loop invariant data should be kept in Flink's managed memory in
serialized form (in a custom hash table). That means that they are not
read back again from the CSV file, but they are kept in serialized
form and need be deserialized again on access.

CC'ing Fabian to double check...

On Mon, Apr 24, 2017 at 4:20 PM, Robert Schwarzenberg
 wrote:
> Hello,
>
> I have a question regarding the loop-awareness of Flink wrt invariant
> datasets.
>
> Does Flink serialize the DataSet 'points' in line 85
>
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
>
> each iteration or are there in-memory optimization procedures in place?
>
> Thanks for your help!
>
> Regards,
> Robert


Re: Flink on Ignite - Collocation?

2017-04-24 Thread Ufuk Celebi
Hey Matt,

in general, Flink doesn't put too much work in co-locating sources
(doesn't happen for Kafka, etc. either). I think the only local
assignments happen in the DataSet API for files in HDFS.

Often this is of limited help anyways. Your approach sounds like it
could work, but I would generally not recommend such custom solutions
if you don't really need it. Have you tried running your program with
remote reads? What's the bottleneck for you there?

– Ufuk

On Mon, Apr 24, 2017 at 11:47 AM, Matt  wrote:
> Hi all,
>
> I've been playing around with Apache Ignite and I want to run Flink on top
> of it but there's something I'm not getting.
>
> Ignite has its own support for clustering, and data is distributed on
> different nodes using a partitioned key. Then, we are able to run a closure
> and do some computation on the nodes that owns the data (collocation of
> computation [1]), that way saving time and bandwidth. It all looks good, but
> I'm not sure how it would play with Flink's own clustering capability.
>
> My initial idea -which I haven't tried yet- is to use collocation to run a
> closure where the data resides, and use that closure to execute a Flink
> pipeline locally on that node (running it using a local environment), then
> using a custom made data source I should be able to plug the data from the
> local Ignite cache to the Flink pipeline and back into a cache using an
> Ignite sink.
>
> I'm not sure it's a good idea to disable Flink distribution and running it
> in a local environment so the data is not transferred to another node. I
> think it's the same problem with Kafka, if it partitions the data on
> different nodes, how do you guarantee that Flink jobs are executed where the
> data resides? In case there's no way to guarantee that unless you enable
> local environment, what do you think of that approach (in terms of
> performance)?
>
> Any additional insight regarding stream processing on Ignite or any other
> distributed storage is very welcome!
>
> Best regards,
> Matt
>
> [1] https://apacheignite.readme.io/docs/collocate-compute-and-data


Re: Disk I/O in Flink

2017-04-24 Thread Ufuk Celebi
Hey Robert,

for batch that should cover the relevant spilling code. If the records
are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
incoming records as well. But that should be covered by the
FileChannel instrumentation as well?

– Ufuk


On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
 wrote:
> Hi,
>
> I have already looked at the UnilateralSortMerger, concluding that all I/O
> eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
> turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
> there more interaction points between Flink and the underlying file system
> that I might want to consider?
>
> Thanks!
> Robert
>
> On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young  wrote:
>>
>> Hi,
>>
>> You probably want check out UnilateralSortMerger.java, this is the class
>> which is responsible for external sort for flink. Here is a short
>> description for how it works: there are totally 3 threads working together,
>> one for reading, one for sorting partial data in memory, and the last one is
>> responsible for spilling. Flink will first figure out how many memory it can
>> use during the in-memory sort, and manage them as MemorySegments. Once these
>> memory runs out, the sorting thread will take over these memory and do the
>> in-memory sorting (For more details about in-memory sorting, you can see
>> NormalizedKeySorter). After this, the spilling thread will write this sorted
>> data to disk and make these memory available again for reading. This will
>> repeated until all data has been processed.
>> Normally, the data will be read twice (one from source, and one from disk)
>> and write once, but if you spilled too much files, flink will first merge
>> some all the files and make sure the last merge step will not exceed some
>> limit (default 128). Hope this can help you.
>>
>> Best,
>> Kurt
>>
>> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke 
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm currently examining the I/O patterns of Flink, and I'd like to know
>>> when/how Flink goes to disk. Let me give an introduction of what I have done
>>> so far.
>>>
>>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
>>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk.
>>> I'm using YARN and HDFS. The underlying file system is XFS.
>>>
>>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters
>>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB
>>> of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
>>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>>
>>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>> wrapper that logs file system statistics for each call to hdfs://..., such
>>> as start time/end time, no. of bytes read/written etc. I can plot these
>>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
>>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes
>>> to hdfs://... So far, so good.
>>>
>>> Now this still did not explain the disk I/O, so I added bytecode
>>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
>>> mapped files etc., and have the same statistics: start/end of a read
>>> from/write to disk, no. of bytes involved and such. I can plot these numbers
>>> too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen
>>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>> (expected).
>>>
>>> Sorry for the enormous introduction, but now there's finally the
>>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data
>>> each during TeraSort. I'm suspecting there is some sort of spilling
>>> involved, potentially because I have not done the setup properly. But that
>>> is not the crucial point: my statistics give a total of 3 TiB of writes to
>>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
>>> from above. However, my statistics only give 2 TiB of reads from disk (1 TiB
>>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk
>>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not
>>> missing any data, meaning my statistics agree with XFS for TeraSort on
>>> Hadoop, which is why I suspect there are some cases where Flink goes to disk
>>> without me noticing it.
>>>
>>> Therefore here finally the question: in which cases does Flink go to
>>> disk, and how does it do so (meaning precisely which Java classes are
>>> involved, so I can check my bytecode instrumentation)? This would also
>>> include any kind of resource distribution via HDFS/YARN I 

Re: Queryable State

2017-04-24 Thread Ufuk Celebi
You should be able to use queryable state w/o any changes to the
default config. The `query.server.port` option defines the port of the
queryable state server that serves the state on the task managers and
it is enabled by default.

The important thing is to configure the client to discover the
JobManager and everything else should work out of the box. Can you
please

1) Use the default config and verify in the JobManager logs that the
JobManager listens on port 6123 (the default JM port) and that all
expected TaskManagers connect to it?

2) Share the code for how you configure the QueryableStateClient?

– Ufuk


On Mon, Apr 24, 2017 at 1:45 PM, Chet Masterson
 wrote:
> I moved up from running queryable state on a standalone Flink instance to a
> several node cluster. My queries don't seem to be responding when I execute
> them on the cluster. A few questions:
>
> 1. The error I am getting:
> WARN [ReliableDeliverySupervisor] Association with remote system
> [akka.tcp://flink@x.x.x.x:6123] has failed, address is now gated for [5000]
> ms. Reason: [Association failed with [akka.tcp://flink@x.x.x.x:6123]] Caused
> by: [Connection refused: /x.x.x.x:6123]
> 2017/04/23 20:19:01.016 ERROR Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@x.x.x.x:6123/),
> Path(/user/jobmanager)]
>
> I assume this is because Flink is not servicing requests on port :6123. I am
> using the default RPC ports defined in flink-conf.yaml. I confirm nothing is
> listening on port 6123 by using netstat on the flink nodes.
>
> 2. I make the following changes on all nodes to flink-conf.yaml, then
> restart the cluster
>
> #jobmanager.rpc.port: 6123
> query.server.port: 6123
> query.server.enable: true
>
> 3. Now port 6123 is open, as viewed from netstat.
>
> My question: what is the proper configuration for servicing external queries
> when running in a cluster? Can I use jobmanager.rpc.port: 6123 which works
> standalone, do I have to add query.server.port and query.server.enable?
> Which port should I be using?
>
>


Re: Checkpoints very slow with high backpressure

2017-04-24 Thread Ufuk Celebi
@Yessine: no, there is no way to disable the back pressure mechanism. Do
you have more details about the two last operators? What do you mean with
the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not
too much data is buffered in the internal buffers (
https://issues.apache.org/jira/browse/FLINK-4545). You could try the
current master and check whether it improves the checkpointing behaviour
under back pressure. Out of curiosity, are you using the async I/O API for
the communication with the external REST service (
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen  wrote:

> Sorry I cant help you, but we're also experiencing slow checkpointing,
> when having backpressure from sink.
>
> I tried HDFS, S3, and RocksDB state backends, but to no avail -
> checkpointing always times out with backpressure.
>
> Can we somehow reduce Flink's internal buffer sizes, so checkpointing with
> backpressure becomes faster?
>
> - Rune
>
> ---
>
> Our current setup - (improvement suggestions welome!):
>
> Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
>
> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: 
> 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024
>
> Basic program structure:
>
> 1) read batch from Kinesis
>
> 2) Split batch and shuffle using custom partitioner (consistent hashing).
>
> 3) enrich using external REST service
>
> 4) Write to database (This step is the bottleneck)
> On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
>
> Im sorry guys if you received multiple instances of this mail, I kept
> trying to send it yesterday, but looks like the mailing list was stuck and
> didn't dispatch it until now. Sorry for the disturb.
> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" 
> wrote:
>>
>> Hi all,
>> I have a Streaming pipeline as follows:
>> 1 - read a folder continuousely from HDFS
>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
>> indicating whether its is seen)
>> 3 - schedule some future actions on the stream using ProcessFunction and
>> processing time timers (elements are kept in a MapState)
>> 4- write results back to HDFS using a BucketingSink.
>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>> Currenlty the source contain just one a file of 1GB, so that's the
>> maximum state that the job might hold. I noticed that the backpressure on
>> the operators #1 and #2 is High, and the split reader has only read 60 Mb
>> out of 1Gb source source file. I suspect this is because the
>> ProcessFunction is slow (on purpose). However looks like this affected the
>> checkpoints which are failing after the timeout (which is set to 2 hours),
>> see attached screenshot.
>> ​
>> In the job manager logs I keep getting warnings :
>>
>> 2017-04-23 19:32:38,827 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>> late message for now expired checkpoint attempt 8 from 
>> 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>>
>> Is the high backpressure the cause for the checkpoints being too slow? If
>> yes Is there a way to disbale the backpressure mechanism since the records
>> will be buffered in the rocksdb state after all which is backed by the disk?
>> Thank you.
>> Best,
>> Yassine
>>
> --
>
> Venlig hilsen/Best regards *Rune Skou Larsen*
>
> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark
> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter:
> RuneSkouLarsen
>


Re: Monitoring REST API and YARN session

2017-03-31 Thread Ufuk Celebi
In this case they are proxied through YARN, you can check the list auf
running applications and click on the Flink app master UI link. Then
you have the host and port for the REST calls. Does this work?

On Fri, Mar 31, 2017 at 1:51 AM, Mohammad Kargar  wrote:
> How can I access the REST APIs for monitoring when cluster launched in a
> yarn session?


Re: Async Functions and Scala async-client for mySql/MariaDB database connection

2017-03-31 Thread Ufuk Celebi
I'm not too familiar with what's happening here, but maybe Klou (cc'd) can help?

On Thu, Mar 30, 2017 at 6:50 PM, Andrea Spina
 wrote:
> Dear Flink community,
>
> I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve
> enriching information from MariaDB database. In order to do that, I firstly
> employed classical jdbc library (org.mariadb.jdbc) and it worked has
> expected.
>
> Due to the blocking behavior of jdbc, I'm trying to use this library
> https://github.com/mauricio/postgresql-async/tree/master/mysql-async
> which promises to offer a subset of features in a non-blocking fashion.
>
> Sadly I'm not able to use it.
>
> Following the async function code.
>
> *
> object AsyncEnricher {
>   case class OutputType(field1: FieldType, field2: FieldType)
> }
>
> class AsyncEnricher(configuration: MariaDBConfig)
> extends AsyncFunction[InputType, OutputType]
> with Serializable
> with AutoCloseable
> with LazyLogging {
>
>   private val queryString = s"SELECT  FROM [table] WHERE
>  = ;"
>
>   implicit lazy val executor =
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>   private lazy val mariaDBClient: Connection = {
> val config = createConfiguration(configuration)
> val connection = new MySQLConnection(config)
> Await.result(connection.connect, 5 seconds)
>   }
>
>   override def asyncInvoke(input: InputType, collector:
> AsyncCollector[OutputType]): Unit = {
>
> val queryResult = mariaDBClient.sendPreparedStatement(queryString,
> Seq(input.fieldToSearch))
>
> queryResult.map(_.rows) onSuccess {
>   case Some(resultSet) =>
> Try {
>   resultSet.head(0).asInstanceOf[FieldType]
> } match {
>   case Success(value) =>
> collector.collect(Iterable(OutputType(value, value)))
>   case Failure(e) =>
> logger.error(s"retrieving value from MariaDB raised $e:
> $queryString executed")
> }
>   case _ => logger.error(s"value not found: $queryString executed")
> }
>
> queryResult onFailure {
>   case e: Throwable =>
> logger.error(s"retrieving location volume from MariaDB raised $e:
> $queryString executed")
> }
>
>   }
>
>   override def close(): Unit = {
> Try(mariaDBClient.disconnect).recover {
>   case t: Throwable => logger.info(s"MariaDB cannot be closed -
> ${t.getMessage}")
> }
>   }
>
> }
> *
>
> Follows the stack
>
> /
> TimerException{java.lang.IllegalStateException: Timer service is shut down}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Timer service is shut down
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
> ... 7 more
>
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> /
>
> I think it's involving connection.connect returning object which is a Future
> and so the Await. This is different than jdbc driver, which worked like a
> charm. I tried to move away the await from the lazy val.
>
> Can't wait for your opinion. Thank you so much in advance.
>
> Andrea
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html
> Sent from the 

Re: flink one transformation end,the next transformation start

2017-03-31 Thread Ufuk Celebi
What is the error message/stack trace you get here?

On Thu, Mar 30, 2017 at 9:33 AM,   wrote:
> hi,all,
> i run a job,it is :
> -
> val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
> val dataVec =
> computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
> val rescomm = computeCosSims
> (dataVec)//DataSet[(String,Array[(String,Double)])]
>
> but when run on the yarn cluster,the result was error,the job can
> success;and run on the local,in eclipse on my computer,the result is
> correct.
>
> so,i run twice,
> first:
> val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
> val dataVec =
> computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
> dataVec.writeAsText("hdfs///vec")//the vector is correct,
>
> second:
> val readVec =
> env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]
> val rescomm = computeCosSims
> (dataVec)//DataSet[(String,Array[(String,Double)])]
> and the result is correct,is the same as on local,in eclispe.
> --
> someone can solve the problem?
>
>


Re: ClassNotFoundException upon Savepoint Disposal

2017-03-29 Thread Ufuk Celebi
Thanks for providing the logs. It looks like the JARs are uploaded
(modulo any bugs ;-))... could you double check that the class is
actually part of the JAR and has not been moved around via

jar -tf  | grep EventDataRecord


If everything looks good in the JAR, I could write a short tool that
prints the pointed file paths and you could manually delete the files
as a work around. Sorry for all the trouble with this.

In version >1.2 we don't need the user code any more to dispose savepoints.

– Ufuk


On Wed, Mar 29, 2017 at 11:50 AM, Konstantin Gregor
 wrote:
> Hi Ufuk, hi Stefan,
>
>
> thanks a lot for your replies.
>
> Ufuk, we are using the HDFS state backend.
>
> Stefan, I installed 1.1.5 on our machines and built our software with the
> Flink 1.1.5 dependency, but the problem remains. Below are the logs for
> savepoint creation [1] and savepoint disposal [2] as well as the logs from
> the start of the job [3]. There were not many more log lines when I set
> org.apache.flink.client to DEBUG, so I set the whole package
> org.apache.flink to DEBUG in the hope of some findings. But I couldn't
> really find anything suspicious.
>
> Again, thanks a lot for your help!
>
> Best regards
>
>
> Konstantin
>
>
> [1]
>
> 2017-03-28 12:21:32,033 INFO  org.apache.flink.client.CliFrontend
> -
> 
> 2017-03-28 12:21:32,034 INFO  org.apache.flink.client.CliFrontend
> -  Starting Command Line Client (Version: 1.1.3, Rev:a56d810,
> Date:10.11.2016 @ 13:25:34 CET)
> 2017-03-28 12:21:32,035 INFO  org.apache.flink.client.CliFrontend
> -  Current user: our_user
> 2017-03-28 12:21:32,035 INFO  org.apache.flink.client.CliFrontend
> -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation -
> 1.7/24.51-b03
> 2017-03-28 12:21:32,035 INFO  org.apache.flink.client.CliFrontend
> -  Maximum heap size: 1749 MiBytes
> 2017-03-28 12:21:32,035 INFO  org.apache.flink.client.CliFrontend
> -  JAVA_HOME: /usr/java/default
> 2017-03-28 12:21:32,037 INFO  org.apache.flink.client.CliFrontend
> -  Hadoop version: 2.3.0
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> -  JVM Options:
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> -
> -Dlog.file=/path/to/our/lib/flink-1.1.3/log/flink-our_user-client-ourserver.log
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> -
> -Dlog4j.configuration=file:/path/to/our/lib/flink-1.1.3/conf/log4j-cli.properties
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> -
> -Dlogback.configurationFile=file:/path/to/our/lib/flink-1.1.3/conf/logback.xml
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> -  Program Arguments:
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> - savepoint
> 2017-03-28 12:21:32,038 INFO  org.apache.flink.client.CliFrontend
> - 7e865198e220bea8a2203ebdb0827b6f
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> - -j
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> - /path/to/our/lib/our_program/lib/our_program-6.2.6-SNAPSHOT-all.jar
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> -  Classpath:
> /path/to/our/lib/flink-1.1.3/lib/flink-dist_2.10-1.1.3.1.jar:/path/to/our/lib/flink-1.1.3/lib/flink-python_2.10-1.1.3.jar:/path/to/our/lib/flink-1.1.3/lib/flink-reporter-1.0.2-20161206.140111-118.jar:/path/to/our/lib/flink-1.1.3/lib/flink-table_2.10-1.1.3.jar:/path/to/our/lib/flink-1.1.3/lib/log4j-1.2.17.jar:/path/to/our/lib/flink-1.1.3/lib/ojdbc6-11.2.0.3.jar:/path/to/our/lib/flink-1.1.3/lib/slf4j-log4j12-1.7.7.jar::/etc/hadoop/conf:
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> -
> 
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> - Using configuration directory /path/to/our/lib/flink-1.1.3/conf
> 2017-03-28 12:21:32,039 INFO  org.apache.flink.client.CliFrontend
> - Trying to load configuration file
> 2017-03-28 12:21:32,050 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.java.opts,
> -Djavax.net.ssl.trustStore=/path/to/our/cacerts
> -XX:HeapDumpPath=/path/to/our/hadoop/yarn/log
> -XX:+HeapDumpOnOutOfMemoryError -XX:MaxPermSize=192m
> 2017-03-28 12:21:32,050 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, localhost
> 2017-03-28 12:21:32,050 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2017-03-28 12:21:32,051 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.mb, 256
> 2017-03-28 12:21:32,051 DEBUG
> org.apache.flink.configuration.GlobalConfiguration- 

Re: Job fails to start with S3 savepoint

2017-03-20 Thread Ufuk Celebi
Hey Abhinav,

the Exception is thrown if the S3 object does not exist.

Can you double check that it actually does exist (no typos, etc.)?

Could this be related to accessing a different region than expected?

– Ufuk


On Mon, Mar 20, 2017 at 9:38 AM, Timo Walther  wrote:

> Hi Abhinav,
>
> can you check if you have configured your AWS setup correctly? The S3
> configuration might be missing.
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/aws.html#missing-s3-filesystem-configuration
>
> Regards,
> Timo
>
>
> Am 18/03/17 um 01:33 schrieb Bajaj, Abhinav:
>
> Hi,
>
>
>
> I am trying to explore using S3 for storing checkpoints and savepoints.
>
> I can get Flink to store the checkpoints and savepoints in s3.
>
>
>
> However, when I try to submit the same Job using the stored savepoint, it
> fails with below exception.
>
> I am using Flink 1.2 and submitted the job from the UI dashboard.
>
>
>
> Can anyone guide me through this issue?
>
>
>
> Thanks,
>
> Abhinav
>
>
>
> *Jobmanager logs with exception* –
>
>
>
> 2017-03-18 00:10:09,193 INFO  org.apache.flink.runtime.blob.
> BlobClient   - Blob client connecting to
> akka://flink/user/jobmanager
>
> 2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.
> client.JobClient  - Checking and uploading JAR files
>
> 2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.blob.
> BlobClient   - Blob client connecting to
> akka://flink/user/jobmanager
>
> 2017-03-18 00:10:09,501 INFO  org.apache.flink.yarn.
> YarnJobManager   - Submitting job
> 4425245091bea9ad103dd3ff338244bb (Session Counter Example).
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Using restart strategy
> NoRestartStrategy for 4425245091bea9ad103dd3ff338244bb.
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Running initialization on
> master for job Session Counter Example (4425245091bea9ad103dd3ff338244bb).
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Successfully ran
> initialization on master in 0 ms.
>
> 2017-03-18 00:10:09,503 INFO  org.apache.flink.yarn.
> YarnJobManager   - Starting job from savepoint
> 's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
>
> 2017-03-18 00:10:09,636 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph - Job Session Counter Example (
> 4425245091bea9ad103dd3ff338244bb) switched from state CREATED to FAILING.
>
> org.apache.flink.runtime.execution.SuppressRestartsException:
> Unrecoverable failure. This suppresses job restarts. Please check the stack
> trace for the root cause.
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply$mcV$sp(JobManager.scala:1369)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply(JobManager.scala:1330)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply(JobManager.scala:1330)
>
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>
> at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.run(Future.scala:24)
>
> at akka.dispatch.TaskInvocation.
> run(AbstractDispatcher.scala:40)
>
> at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.IllegalArgumentException: Invalid path
> 's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointStore.createFsInputStream(SavepointStore.java:182)
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointStore.loadSavepoint(SavepointStore.java:131)
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply$mcV$sp(JobManager.scala:1348)
>
> ... 10 more
>
> 2017-03-18 00:10:09,638 INFO  org.apache.flink.runtime.
> 

Re: Telling if a job has caught up with Kafka

2017-03-17 Thread Ufuk Celebi
@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda  wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König 
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra :
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>


Re: Return of Flink shading problems in 1.2.0

2017-03-17 Thread Ufuk Celebi
Pulling in Robert and Stephan who know the project's shading setup the best.

On Fri, Mar 17, 2017 at 6:52 AM, Foster, Craig  wrote:
> Hi:
>
> A few months ago, I was building Flink and ran into shading issues for
> flink-dist as described in your docs. We resolved this in BigTop by adding
> the correct way to build flink-dist in the do-component-build script and
> everything was fine after that.
>
>
>
> Now, I’m running into issues doing the same now in Flink 1.2.0 and I’m
> trying to figure out what’s changed and how to fix it. Here’s how the
> flink-dist jar looks with proper shading:
>
>
>
> jar -tvf /usr/lib/flink/lib/flink-dist_2.10-1.1.4.jar | grep
> HttpConnectionParams
> 2485 Tue Jan 01 00:00:00 UTC 1980
> org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
> 3479 Tue Jan 01 00:00:00 UTC 1980
> org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class
>
>
>
> When I build Flink 1.2.0 in BigTop, here’s shading for the jar found in the
> RPM:
>
>
>
> jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
> 2392 Tue Jan 01 00:00:00 GMT 1980
> org/apache/commons/httpclient/params/HttpConnectionParams.class
> 2485 Tue Jan 01 00:00:00 GMT 1980
> org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
> 3479 Tue Jan 01 00:00:00 GMT 1980
> org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class
> 2868 Tue Jan 01 00:00:00 GMT 1980
> org/apache/http/params/HttpConnectionParams.class
>
>
>
> I thought maybe it was some strange thing going on with BigTop, so then I
> tried just straight building Flink 1.2.0 (outside BigTop) and get the same
> shading:
>
>
>
> jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
>
>   2485 Fri Mar 17 05:41:16 GMT 2017
> org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
>
>   3479 Fri Mar 17 05:41:16 GMT 2017
> org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class
>
>   2392 Fri Mar 17 05:41:24 GMT 2017
> org/apache/commons/httpclient/params/HttpConnectionParams.class
>
>   2868 Fri Mar 17 05:41:24 GMT 2017
> org/apache/http/params/HttpConnectionParams.class
>
>
>
> And, yes, this is after going into flink-dist and running mvn clean install
> again since I am using Maven 3.3.x.
>
>
>
> Here’s a snippet from my Maven version:
>
> mvn -version
>
> Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5;
> 2015-11-10T16:41:47+00:00)
>
> Maven home: /usr/local/apache-maven
>
> Java version: 1.8.0_121, vendor: Oracle Corporation
>
>
>
> Any ideas on what my problem might be here?
>
>
>
> Thanks,
>
> Craig
>
>


Re: Questions regarding queryable state

2017-03-16 Thread Ufuk Celebi
On Thu, Mar 16, 2017 at 10:00 AM, Kathleen Sharp
 wrote:
> Hi,
>
> I have some questions regarding the Queryable State feature:
>
> Is it possible to use the QueryClient to get a list of keys for a given State?

No, this is not possible at the moment. You would have to trigger a
query for each key and then gather all results (for example via
Future.sequence [1]).

> At the moment it is not possible to use ListState - will this ever be
> introduced?

This is not exposed via the asQueryableState API, because it is not
possible to ever clear the list in that case. You can write a custom
function and create the list state yourself, like this (pseudo code):

stream.keyBy().flatMap({
   ListStateDescriptor listStateDesc = ...
   listStateDesc.setQueryable(name); // make queryable

   ListState state = getRuntimeContext().getListState(listState)
});

I would only use this if the list is also cleared at some point in
time (otherwise it will grow without bounds).

>
> My first impression is that I would need one of these 2 to be able to
> use Queryable state.
>
> I would then probably need to build only top of the queryable state
> client to allow filtering, pagination etc of results. Is the intention
> to enrich the client at some point with this (assuming list state
> becomes supported)?

I can imagine to support querying multiple keys at once, with list
state I'm not sure.

> The queryable state client needs a job id, is there any recommended
> way of getting ahold of this?

No, this is a major shortcoming. :-( I would like to go over the
queryable state client for the next release and make sure to get some
of these annoyances out of the way, like using the job name instead of
the JobID or requiring another query name for the job if you want to
make it queryable via name.

[1] 
http://www.scala-lang.org/api/current/scala/concurrent/Future$.html#sequence[A,M[X]<:TraversableOnce[X]](in:M[scala.concurrent.Future[A]])(implicitcbf:scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]],implicitexecutor:scala.concurrent.ExecutionContext):scala.concurrent.Future[M[A]]


Re: Remove Accumulators at runtime

2017-03-09 Thread Ufuk Celebi
I see, this is not possible with accumulators. You could wrap all
counts in a single metric and update that one. Check out Flink's
metrics:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html


On Wed, Mar 8, 2017 at 5:04 PM, PedroMrChaves  wrote:
> Hi,
>
> I'm building a system that maintains a set of rules that can be dynamically
> added/removed. I wanted to count every element that matched each rule in an
> accumulator ( I have several parallel instances). If the rule is removed so
> should the accumulator.
>
>
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-Accumulators-at-runtime-tp12106p12119.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
OK, the thing is that the JVMs are not shut down when you cancel the
task. Therefore no shut down hook is executed when you cancel.

You would have to execute bin/stop-cluster.sh to stop the JVM. Does
that make sense?


On Wed, Mar 8, 2017 at 3:34 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> I’m not using YARN but instead of starting the cluster using 
> bin/start-cluster.sh
>
>> On 8 Mar 2017, at 15:32, Ufuk Celebi <u...@apache.org> wrote:
>>
>> On Wed, Mar 8, 2017 at 3:19 PM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> The cluster consists of 4 workers and a master node.
>>
>> Are you starting the cluster via bin/start-cluster.sh or are you using
>> YARN etc.?
>


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
On Wed, Mar 8, 2017 at 3:19 PM, Dominik Safaric
 wrote:
> The cluster consists of 4 workers and a master node.

Are you starting the cluster via bin/start-cluster.sh or are you using
YARN etc.?


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
How are you deploying your job?

Shutdown hooks are executed when the JVM terminates whereas the cancel
command only cancels the Flink job and the JVM process potentially
keeps running. For example, running a standalone cluster would keep
the JVMs running.

On Wed, Mar 8, 2017 at 9:36 AM, Timo Walther  wrote:
> Hi Dominik,
>
> did you take a look into the logs? Maybe the exception is not shown in the
> CLI but in the logs.
>
> Timo
>
> Am 07/03/17 um 23:58 schrieb Dominik Safaric:
>
>> Hi all,
>>
>> I would appreciate for any help or advice in regard to default Java
>> runtime shutdown hooks and canceling Flink jobs.
>>
>> Namely part of my Flink application I am using a Kafka interceptor class
>> that defines a shutdown hook thread. When stopping the Flink streaming job
>> on my local machine the shutdown hook gets executed, however I do not see
>> the same behaviour when stopping the Flink application using bin/flink
>> cancel .
>>
>> Considering there are no exceptions thrown from the shutdown thread, what
>> could the root cause of this be?
>>
>> Thanks,
>> Dominik
>
>
>


  1   2   3   4   5   >