Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-28 Thread Hao Sun
Sounds good. Thank you!

Hao Sun


On Thu, Feb 27, 2020 at 6:52 PM Yang Wang  wrote:

> Hi Hao Sun,
>
> I just post the explanation to the user ML so that others could also have
> the same problem.
>
> Gven the job graph is fetched from the jar, do we still need Zookeeper for
>> HA? Maybe we still need it for checkpoint locations?
>
>
> Yes, we still need the zookeeper(maybe in the future we will have a native
> K8s HA based on etcd) for the complete recovery. You
> are right. We still need it for finding the checkpoint locations. Also the
> Zookeeper will be used for leader election and leader retriever.
>
>
> Best,
> Yang
>
> Hao Sun  于2020年2月28日周五 上午1:41写道:
>
>> Hi Yang, given the job graph is fetched from the jar, do we still need
>> Zookeeper for HA? Maybe we still need it for checkpoint locations?
>>
>> Hao Sun
>>
>>
>> On Thu, Feb 27, 2020 at 5:13 AM Yang Wang  wrote:
>>
>>> Hi Jin Yi,
>>>
>>> For standalone per-job cluster, it is a little different about the
>>> recovery.
>>> Just as you say, the user jar has built in the image, when the
>>> JobManager failed
>>> and relaunched by the K8s, the user `main()` will be executed again to
>>> get the
>>> job graph, not like session cluster to get the job graph from
>>> high-availability storage.
>>> Then the job will be submitted again and the job could recover from the
>>> latest
>>> checkpoint(assume that you have configured the high-availability).
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Jin Yi  于2020年2月27日周四 下午2:50写道:
>>>
>>>> Hi Yang,
>>>>
>>>> regarding your statement below:
>>>>
>>>> Since you are starting JM/TM with K8s deployment, when they failed new
>>>> JM/TM will be created. If you do not set the high
>>>> availability configuration, your jobs could recover when TM failed.
>>>> However, they could not recover when JM failed. With HA
>>>> configured, the jobs could always be recovered and you do not need to
>>>> re-submit again.
>>>>
>>>> Does it also apply to Flink Job Cluster? When the JM pod restarted by
>>>> Kubernetes, the image contains the application jar also, so if the
>>>> statement also applies to the Flink Job Cluster mode, can you please
>>>> elaborate why?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> On Mon, Feb 24, 2020 at 6:36 PM Yang Wang 
>>>> wrote:
>>>>
>>>>> Hi M Singh,
>>>>>
>>>>> > Mans - If we use the session based deployment option for K8 - I
>>>>>> thought K8 will automatically restarts any failed TM or JM.
>>>>>> In the case of failed TM - the job will probably recover, but in the
>>>>>> case of failed JM - perhaps we need to resubmit all jobs.
>>>>>> Let me know if I have misunderstood anything.
>>>>>
>>>>>
>>>>> Since you are starting JM/TM with K8s deployment, when they failed new
>>>>> JM/TM will be created. If you do not set the high
>>>>> availability configuration, your jobs could recover when TM failed.
>>>>> However, they could not recover when JM failed. With HA
>>>>> configured, the jobs could always be recovered and you do not need to
>>>>> re-submit again.
>>>>>
>>>>> > Mans - Is there any safe way of a passing creds ?
>>>>>
>>>>>
>>>>> Yes, you are right, Using configmap to pass the credentials is not
>>>>> safe. On K8s, i think you could use secrets instead[1].
>>>>>
>>>>> > Mans - Does a task manager failure cause the job to fail ?  My
>>>>>> understanding is the JM failure are catastrophic while TM failures are
>>>>>> recoverable.
>>>>>
>>>>>
>>>>> What i mean is the job failed, and it could be restarted by your
>>>>> configured restart strategy[2].
>>>>>
>>>>> > Mans - So if we are saving checkpoint in S3 then there is no need
>>>>>> for disks - should we use emptyDir ?
>>>>>
>>>>>
>>>>> Yes, if you are saving the checkpoint in S3 and also set the
>>>>> `high-availability.storageDir` to S3. Then you do not need persistent
>>>>> volume. Since
>>>>> the l

Re: Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Hao Sun
I saw similar issue when using alpine linux.
https://pkgs.alpinelinux.org/package/v3.3/main/x86/nss

Installing this package fixed my problem

Hao Sun


On Thu, Oct 10, 2019 at 3:46 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
> I'm getting the following error message on a Flink 1.8 cluster deployed on
> Kubernetes. I've already confirmed that the pod has access to S3 and write
> permissions to the bucket, but I can't understand what the SSL issue is and
> if it is related to S3 or not. I have tried both with the default state
> backend and with rocksdb. It happens immediately upon triggering a
> savepoint. Has anyone seen errors like this?
>
> Thank you!
> Austin
>
>
> 2019-10-10T22:21:36.496009042Z 2019-10-10 22:21:36,495 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 1 @ 1570746096485 for job
> 32a9a430038d440cbfee808101dcccd1.
> 2019-10-10T22:21:36.871364673Z 2019-10-10 22:21:36,858 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
> PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)
> (6cb94a0782212895ac1e062b4124e425) switched from RUNNING to FAILED.
> 2019-10-10T22:21:36.87141613Z java.lang.ExceptionInInitializerError: null
> 2019-10-10T22:21:36.871422053Z  at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:183
> <http://SSLSessionImpl.java:183>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87142623Z   at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:148
> <http://SSLSessionImpl.java:148>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871429991Z  at
> sun.security.ssl.SSLSessionImpl.(http://SSLSessionImpl.java:79
> <http://SSLSessionImpl.java:79>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871434248Z  at
> sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871450145Z  at sun.security.ssl.SSLSocketImpl.(
> http://SSLSocketImpl.java:572
> <http://SSLSocketImpl.java:572>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871454174Z  at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871457724Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871461103Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871465705Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.87146981Z   at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871474533Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871494299Z  at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87149796Z   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871502637Z  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871506464Z  at
> java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871510239Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871513871Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy19.connect(Unknown
> Source) ~[?:1.8.0-stream1]
> 2019-10-10T22:21:36.871516965Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871520624Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
> ~[f

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Hao Sun
Yep I know that option. That's where get me confused as well. In a HA
setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember.
And HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang  wrote:

> Just a minor supplement @Hao Sun , if you decided to
> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
> option [1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state>
>
> Best
> Yun Tang
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Thursday, October 10, 2019 19:24
> *To:* Yang Wang 
> *Cc:* Sean Hester ; Aleksandar Mastilovic <
> amastilo...@sightmachine.com>; Yun Tang ; Hao Sun <
> ha...@zendesk.com>; Yuval Itzchakov ; user <
> user@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Thanks Yang. We will try and let you know if any issues arise
>
> Regards
> Bhaskar
>
> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang  wrote:
>
> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar  于2019年10月10日周四 下午2:26写道:
>
> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>  a) In that case, at least one job manager out of HA group should be
> up and running right? or
>  b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>  How to start from previous check point?
>  What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:
>
> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester  于2019年10月1日周二 上午1:47写道:
>
> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
> wrote:
>
> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
> wrote:
>
> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
> wrote:
>
> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
I think I overlooked it. Good point. I am using Redis to save the path to
my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  wrote:

> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
>> wrote:
>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator
>>> <https://github.com/lyft/flinkk8soperator> which
>>> already has a pretty fancy state machine for rolling upgrades. I'd love to
>>> be involved as this is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
>>> wrote:
>>>
>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>> cases when deploying Flink jobs to start from savepoints using the
>>>> job-cluster mode in Kubernetes.
>>>>
>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>> are all long-running streaming jobs, all essentially acting as
>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>
>>>> we have a number of use cases where we want to restart jobs from a
>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>> latest checkpoint.
>>>>
>>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>>> deployment includes the savepoint argument in the configuration. if the Job
>>>> Manager container(s) have an unplanned restart, when they come back up they
>>>> will start from the savepoint instead of resuming from the latest
>>>> checkpoint. everything is working as configured, but that's not exactly
>>>> what we want. we want the savepoint argument to be transient somehow (only
>>>> used during the initial deployment), but Kubernetes doesn't really support
>>>> the concept of transient configuration.
>>>>
>>>> i can see a couple of potential solutions that either involve custom
>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>> script that records that the configured savepoint has already been used in
>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>> head down that road i wanted to ask:
>>>>
>>>>- is this is already a solved problem that i've missed?
>>>>- is this issue already on the community's radar?
>>>>
>>>> thanks in advance!
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com>
>>>> <http://www.bettercloud.com>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
We always make a savepoint before we shutdown the job-cluster. So the
savepoint is always the latest. When we fix a bug or change the job graph,
it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start
from checkpoint after a bug fix.
>From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:

> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator
> <https://github.com/lyft/flinkk8soperator> which already
> has a pretty fancy state machine for rolling upgrades. I'd love to be
> involved as this is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> wrote:
>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>- is this is already a solved problem that i've missed?
>>- is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com>
>> <http://www.bettercloud.com>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience” 
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Hao Sun
Congratulations Rong.

On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:

> Congratulations, Rong!
>
> On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:
>
>> Congrats, Rong!
>>
>>
>> On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:
>>
>> > Congratulations Rong!
>> >
>> > ---
>> > Oytun Tez
>> >
>> > *M O T A W O R D*
>> > The World's Fastest Human Translation Platform.
>> > oy...@motaword.com — www.motaword.com
>> 
>> >
>> >
>> > On Thu, Jul 11, 2019 at 1:44 PM Peter Huang > >
>> > wrote:
>> >
>> >> Congrats Rong!
>> >>
>> >> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin 
>> wrote:
>> >>
>> >>> Congrats, Rong!
>> >>>
>> >>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui 
>> wrote:
>> >>>
>>  Congrats Rong!
>> 
>>  Best,
>>  Xingcan
>> 
>>  On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
>> 
>>  Congratulations, Rong!
>> 
>>  On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>> 
>> > Congratulations Rong!
>> >
>> > Best Regards,
>> > Yu
>> >
>> >
>> > On Thu, 11 Jul 2019 at 22:54, zhijiang 
>> > wrote:
>> >
>> >> Congratulations Rong!
>> >>
>> >> Best,
>> >> Zhijiang
>> >>
>> >> --
>> >> From:Kurt Young 
>> >> Send Time:2019年7月11日(星期四) 22:54
>> >> To:Kostas Kloudas 
>> >> Cc:Jark Wu ; Fabian Hueske ;
>> >> dev ; user 
>> >> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>> >>
>> >> Congratulations Rong!
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas <
>> kklou...@gmail.com>
>> >> wrote:
>> >> Congratulations Rong!
>> >>
>> >> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>> >> Congratulations Rong Rong!
>> >> Welcome on board!
>> >>
>> >> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>> >> wrote:
>> >> Hi everyone,
>> >>
>> >> I'm very happy to announce that Rong Rong accepted the offer of the
>> >> Flink PMC to become a committer of the Flink project.
>> >>
>> >> Rong has been contributing to Flink for many years, mainly working
>> on
>> >> SQL and Yarn security features. He's also frequently helping out
>> on the
>> >> user@f.a.o mailing lists.
>> >>
>> >> Congratulations Rong!
>> >>
>> >> Best, Fabian
>> >> (on behalf of the Flink PMC)
>> >>
>> >>
>> >>
>> 
>>
>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>


Re: Graceful Task Manager Termination and Replacement

2019-07-11 Thread Hao Sun
I have a common interest in this topic. My k8s recycle hosts, and I am
facing the same issue. Flink can tolerate this situation, but I am
wondering if I can do better

On Thu, Jul 11, 2019, 12:39 Aaron Levin  wrote:

> Hello,
>
> Is there a way to gracefully terminate a Task Manager beyond just killing
> it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm
> interested in a way to replace a Task Manager that has currently-running
> tasks. It would be great if it was possible to terminate a Task Manager
> without restarting the job, though I'm not sure if this is possible.
>
> Context: at my work we regularly cycle our hosts for maintenance and
> security. Each time we do this we stop the task manager running on the host
> being cycled. This causes the entire job to restart, resulting in downtime
> for the job. I'd love to decrease this downtime if at all possible.
>
> Thanks! Any insight is appreciated!
>
> Best,
>
> Aaron Levin
>


Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Hao Sun
Personally I prefer 3) to keep split/select and correct the behavior. I
feel side output is kind of overkill for such a primitive function, and I
prefer simple APIs like split/select.

Hao Sun


On Thu, Jul 4, 2019 at 11:20 AM Xingcan Cui  wrote:

> Hi folks,
>
> Two weeks ago, I started a thread [1] discussing whether we should discard
> the split/select methods (which have been marked as deprecation since v1.7)
> in DataStream API.
>
> The fact is, these methods will cause "unexpected" results when using
> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or
> multi-times on the same target (e.g., ds.split(a).select(b),
> ds.split(c).select(d)). The reason is that following the initial design,
> the new split/select logic will always override the existing one on the
> same target operator, rather than append to it. Some users may not be
> aware of that, but if you do, a current solution would be to use the more
> powerful side output feature [2].
>
> FLINK-11084 <https://issues.apache.org/jira/browse/FLINK-11084> added
> some restrictions to the existing split/select logic and suggest to
> replace it with side output in the future. However, considering that the
> side output is currently only available in the process function layer and
> the split/select could have been widely used in many real-world
> applications, we'd like to start a vote andlisten to the community on how
> to deal with them.
>
> In the discussion thread [1], we proposed three solutions as follows. All
> of them are feasible but have different impacts on the public API.
>
> 1) Port the side output feature to DataStream API's flatMap and replace
> split/select with it.
>
> 2) Introduce a dedicated function in DataStream API (with the "correct"
> behavior but a different name) that can be used to replace the existing
> split/select.
>
> 3) Keep split/select but change the behavior/semantic to be "correct".
>
> Note that this is just a vote for gathering information, so feel free to
> participate and share your opinions.
>
> The voting time will end on *July 7th 17:00 EDT*.
>
> Thanks,
> Xingcan
>
> [1]
> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
> <https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html>
>


Re: status on FLINK-7129

2019-04-23 Thread Hao Sun
+1

On Tue, Apr 23, 2019, 05:18 Vishal Santoshi 
wrote:

> +1
>
> On Tue, Apr 23, 2019, 4:57 AM kant kodali  wrote:
>
>> Thanks all for the reply. I believe this is one of the most important
>> feature that differentiates flink from other stream processing engines as
>> others don't even have CEP yet. so it would be great if this issue can get
>> more attention as I don't think anyone want's to restarts the Job every
>> time they want to detect a new pattern.
>>
>> On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
>>> development on that issue.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 22/04/2019 18:20, Konstantin Knauf wrote:
>>>
>>> Hi Kant,
>>>
>>> as far as I know, no one is currently working on this. Dawid (cc) maybe
>>> knows more.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:
>>>
 Hi All,

 There seems to be a lot of interest for
 https://issues.apache.org/jira/browse/FLINK-7129
 

 Any rough idea on the status of this issue?

 Thanks!

>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>> Planned Absences: 17.04.2019 - 26.04.2019
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward
>>>  - The
>>> Apache Flink Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244
>>> B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>>


Re: java.io.IOException: NSS is already initialized

2019-04-17 Thread Hao Sun
I think I found the root cause

https://bugs.alpinelinux.org/issues/10126

I have to re-install nss after apk update/upgrade

Hao Sun


On Sun, Nov 11, 2018 at 10:50 AM Ufuk Celebi  wrote:

> Hey Hao,
>
> 1) Regarding Hadoop S3: are you using the repackaged Hadoop S3
> dependency from the /opt folder of the Flink distribution? Or the
> actual Hadoop implementation? If latter, would you mind also running
> it with the one that comes packaged with Flink? For this you can
> remove all Hadoop-related configuration in your flink-conf.yaml and
> copy the Hadoop S3 dependency from /opt to /lib and configure it [1].
>
> 2) Could you please share your complete Flink configuration for when
> you tried to run with Presto S3? If you don't want to share this
> publicly, feel free to share it privately with me. I'm curious to see
> whether we can reproduce this.
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended>
> On Sat, Nov 10, 2018 at 4:07 PM Hao Sun  wrote:
> >
> > Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have
> the same issue with presto.
> >
> > - why checkpoint is not starting from 1? old chk stored in ZK caused it,
> I cleaned it up, but not very helpful
> > - I switched to Flink + Hadoop28, and used hadoop s3, with no other
> changes, check pointing is working with the hadoop flavour.
> >
> > On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:
> >>
> >> Hey Hao Sun,
> >>
> >> - Is this an intermittent failure or permanent? The logs indicate that
> >> some checkpoints completed before the error occurs (e.g. checkpoint
> >> numbers are greater than 1).
> >>
> >> - Which Java versions are you using? And which Java image? I've
> >> Googled similar issues that seem to be related to the JVM, e.g. [1].
> >>
> >> Best,
> >>
> >> Ufuk
> >>
> >> [1]
> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
> <https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972>
> >>
> >>
> >> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
> >> >
> >> > Thanks, any insight/help here is appreciated.
> >> >
> >> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
> >> >>
> >> >> Hi Hao,
> >> >>
> >> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas
> who were recently working with S3, maybe they will have some ideas.
> >> >>
> >> >> Best,
> >> >>
> >> >> Dawid
> >> >>
> >> >> On 03/11/2018 03:09, Hao Sun wrote:
> >> >>
> >> >> Same environment, new error.
> >> >>
> >> >> I can run the same docker image with my local Mac, but on K8S, this
> gives me this error.
> >> >> I can not think of any difference between local Docker and K8S
> Docker.
> >> >>
> >> >> Any hint will be helpful. Thanks
> >> >>
> >> >> 
> >> >>
> >> >> 2018-11-02 23:29:32,981 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> >> >> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> >> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >> at
> jav

Re: inconsistent module descriptor for hadoop uber jar (Flink 1.8)

2019-04-16 Thread Hao Sun
I am using sbt and sbt-assembly.
In build.sbt

libraryDependencies ++= Seq("org.apache.flink" %
"flink-shaded-hadoop2-uber" % "2.8.3-1.8.0")


Hao Sun


On Tue, Apr 16, 2019 at 12:07 AM Gary Yao  wrote:

> Hi,
>
> Can you describe how to reproduce this?
>
> Best,
> Gary
>
> On Mon, Apr 15, 2019 at 9:26 PM Hao Sun  wrote:
>
>> Hi, I can not find the root cause of this, I think hadoop version is
>> mixed up between libs somehow.
>>
>> --- ERROR ---
>> java.text.ParseException: inconsistent module descriptor file found in '
>> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.pom
>> <https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.pom>':
>>  bad
>> revision: expected='2.8.3-1.8.0' found='2.4.1-1.8.0';
>>
>> Is this a bug?
>>
>> Hao Sun
>>
>


inconsistent module descriptor for hadoop uber jar (Flink 1.8)

2019-04-15 Thread Hao Sun
Hi, I can not find the root cause of this, I think hadoop version is mixed
up between libs somehow.

--- ERROR ---
java.text.ParseException: inconsistent module descriptor file found in '
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.pom':
bad revision: expected='2.8.3-1.8.0' found='2.4.1-1.8.0';

Is this a bug?

Hao Sun


Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-04 Thread Hao Sun
Thanks Congxian for the tip. Arthas looks great

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Fri, Jan 4, 2019 at 5:42 PM Congxian Qiu  wrote:

> Hi, Hao Sun
>
> For debugging the `ClassNotFoundException`, maybe the Arthas[1] tool can
> help.
>
> [1] Arthas <https://github.com/alibaba/arthas>:
> https://github.com/alibaba/arthas
> <https://github.com/alibaba/arthas>
>
> Hao Sun  于2019年1月3日周四 下午10:08写道:
>
>>
>> I am on Flink 1.7.1 and K8S.
>> I said "suddenly" because my program worked fine until I added a new
>> MapFunction.
>> I do not know the details, but I think I know what is causing it
>>
>> === Start of Program ===
>> val stream: DataStream[MaxwellEvent] = 
>> steam.map(new ProblemFunction()) will cause the issue
>> class ProblemFunction(stringParam: String)(implicit datadog:
>> DatadogClient) extends MapFunction[MaxwellEvent, MaxwellEvent]
>> === End of Program ===
>>
>> I think the class taking curry params caused the issue, after I give up
>> on the curry format, the error disappeared.
>>
>> I am using https://github.com/sbt/sbt-assembly
>> <https://github.com/sbt/sbt-assembly> to assemble
>> the fat jar.
>> There might be some issue, or config issue with that as well.
>>
>> I am reading this article, it is a good start for me as well
>>
>> https://heapanalytics.com/blog/engineering/missing-scala-class-noclassdeffounderror
>> <https://heapanalytics.com/blog/engineering/missing-scala-class-noclassdeffounderror>
>>
>>
>> Hao Sun
>> Team Lead
>> 1019 Market St. 7F
>> San Francisco, CA 94103
>>
>>
>> On Thu, Jan 3, 2019 at 1:08 AM Timo Walther  wrote:
>>
>>> Hi Hao,
>>>
>>> which Flink version are you using? What do you mean with "suddenly", did
>>> it work before?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 03.01.19 um 07:13 schrieb Hao Sun:
>>>
>>> Yep, javap shows the class is there, but FlinkUserCodeClassLoaders
>>> somehow could not find it suddenly
>>>
>>> javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar
>>> 'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'
>>> Compiled from "ConnectedStreams.scala"
>>> public final class
>>> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45
>>> extends
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer
>>> {
>>> public com.zendesk.fraudprevention.datatypes.MaxwellEvent
>>> createInstance(java.lang.Object[]);
>>> public
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer
>>> createSerializerInstance(java.lang.Class,
>>> org.apache.flink.api.common.typeutils.TypeSerializer[]);
>>> public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
>>> createSerializerInstance(java.lang.Class,
>>> org.apache.flink.api.common.typeutils.TypeSerializer[]);
>>> public java.lang.Object createInstance(java.lang.Object[]);
>>> public
>>> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90,
>>> org.apache.flink.api.common.typeutils.TypeSerializer[]);
>>> }
>>>
>>> Hao Sun
>>> Team Lead
>>> 1019 Market St. 7F
>>> San Francisco, CA 94103
>>>
>>>
>>> On Wed, Jan 2, 2019 at 6:04 PM qi luo  wrote:
>>>
>>>> Hi Hao,
>>>>
>>>> Since Flink is using Child-First class loader, you may try search for
>>>> the class 
>>>> "*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
>>>> in your fat JAR. Is that an inner class?*
>>>>
>>>> *Best,*
>>>> *Qi*
>>>>
>>>> On Jan 3, 2019, at 7:01 AM, Hao Sun  wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am wondering if there are any protips to figure out what class is not
>>>> found?
>>>>
>>>> = Logs 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
>>>> instantiate chained outputs.
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
>>>> at org.apache.flink.stre

Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-03 Thread Hao Sun
I am on Flink 1.7.1 and K8S.
I said "suddenly" because my program worked fine until I added a new
MapFunction.
I do not know the details, but I think I know what is causing it

=== Start of Program ===
val stream: DataStream[MaxwellEvent] = 
steam.map(new ProblemFunction()) will cause the issue
class ProblemFunction(stringParam: String)(implicit datadog: DatadogClient)
extends MapFunction[MaxwellEvent, MaxwellEvent]
=== End of Program ===

I think the class taking curry params caused the issue, after I give up on
the curry format, the error disappeared.

I am using https://github.com/sbt/sbt-assembly to assemble the fat jar.
There might be some issue, or config issue with that as well.

I am reading this article, it is a good start for me as well
https://heapanalytics.com/blog/engineering/missing-scala-class-noclassdeffounderror


Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Thu, Jan 3, 2019 at 1:08 AM Timo Walther  wrote:

> Hi Hao,
>
> which Flink version are you using? What do you mean with "suddenly", did
> it work before?
>
> Regards,
> Timo
>
>
> Am 03.01.19 um 07:13 schrieb Hao Sun:
>
> Yep, javap shows the class is there, but FlinkUserCodeClassLoaders somehow
> could not find it suddenly
>
> javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar
> 'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'
> Compiled from "ConnectedStreams.scala"
> public final class
> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45
> extends
> org.apache.flink.api.scala.typeutils.CaseClassSerializer
> {
> public com.zendesk.fraudprevention.datatypes.MaxwellEvent
> createInstance(java.lang.Object[]);
> public
> org.apache.flink.api.scala.typeutils.CaseClassSerializer
> createSerializerInstance(java.lang.Class,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
> createSerializerInstance(java.lang.Class,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> public java.lang.Object createInstance(java.lang.Object[]);
> public
> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> }
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> On Wed, Jan 2, 2019 at 6:04 PM qi luo  wrote:
>
>> Hi Hao,
>>
>> Since Flink is using Child-First class loader, you may try search for the
>> class 
>> "*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
>> in your fat JAR. Is that an inner class?*
>>
>> *Best,*
>> *Qi*
>>
>> On Jan 3, 2019, at 7:01 AM, Hao Sun  wrote:
>>
>> Hi,
>>
>> I am wondering if there are any protips to figure out what class is not
>> found?
>>
>> = Logs 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
>> instantiate chained outputs.
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
>> http://OperatorChain.java:133
>> <http://OperatorChain.java:133>)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> at java.lang.Thread.run(Thread.java:748)
>> *Caused by: java.lang.ClassNotFoundException:
>> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at
>> java.io.ObjectInputStream.defaultRea

Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-02 Thread Hao Sun
Yep, javap shows the class is there, but FlinkUserCodeClassLoaders somehow
could not find it suddenly

javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar
'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'
Compiled from "ConnectedStreams.scala"
public final class
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45
extends
org.apache.flink.api.scala.typeutils.CaseClassSerializer
{
  public com.zendesk.fraudprevention.datatypes.MaxwellEvent
createInstance(java.lang.Object[]);
  public
org.apache.flink.api.scala.typeutils.CaseClassSerializer
createSerializerInstance(java.lang.Class,
org.apache.flink.api.common.typeutils.TypeSerializer[]);
  public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
createSerializerInstance(java.lang.Class,
org.apache.flink.api.common.typeutils.TypeSerializer[]);
  public java.lang.Object createInstance(java.lang.Object[]);
  public
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90,
org.apache.flink.api.common.typeutils.TypeSerializer[]);
}

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Wed, Jan 2, 2019 at 6:04 PM qi luo  wrote:

> Hi Hao,
>
> Since Flink is using Child-First class loader, you may try search for the
> class 
> "*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
> in your fat JAR. Is that an inner class?*
>
> *Best,*
> *Qi*
>
> On Jan 3, 2019, at 7:01 AM, Hao Sun  wrote:
>
> Hi,
>
> I am wondering if there are any protips to figure out what class is not
> found?
>
> = Logs 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
> instantiate chained outputs.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> http://OperatorChain.java:133
> <http://OperatorChain.java:133>)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by: java.lang.ClassNotFoundException:
> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at java.util.ArrayList.readObject(ArrayList.java:797)
> at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:524)
> at
> org.apache.flink.util.Instant

Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-02 Thread Hao Sun
Hi,

I am wondering if there are any protips to figure out what class is not
found?

= Logs 
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
instantiate chained outputs.
at
org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:133)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
*Caused by: java.lang.ClassNotFoundException:
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:524)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:510)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:498)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:459)
at
org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:321)
... 5 more


Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


Re: Kafka consumer, is there a way to filter out messages using key only?

2018-12-27 Thread Hao Sun
Cool, thanks! I used the option value approach, worked well.

On Thu, Dec 27, 2018, 03:49 Dominik Wosiński  wrote:

> Hey,
> AFAIK, returning null from deserialize function in FlinkKafkaConsumer will
> indeed filter the message out and it won't be further processed.
>
> Best Regards,
> Dom.
>
> śr., 19 gru 2018 o 11:06 Dawid Wysakowicz 
> napisał(a):
>
>> Hi,
>>
>> I'm afraid that there is no out-of-the box solution for this, but what
>> you could do is to generate from KeyedDeserializationSchema some marker
>> (Optional, null value...) based on the message key, that would allow you
>> later to filter it out. So assuming the Optional solution the result of
>> KeyedDeserializationSchema#deserialize could be Optional.empty() for
>> invalid keys and Optional.of(deserializedValue) for valid keys.
>>
>> Best,
>>
>> Dawid
>> On 18/12/2018 20:22, Hao Sun wrote:
>>
>> Hi, I am using 1.7 on K8S.
>>
>> I have a huge amount of data in kafka, but I only need a tiny portion of
>> it.
>> It is a keyed stream, the value in JSON encoded. I want to avoid
>> deserialization of the value, since it is very expensive. Can I only filter
>> based on the key?
>> I know there is a KeyedDeserializationSchema, but can I use it to filter
>> data?
>>
>> Hao Sun
>> Team Lead
>> 1019 Market St. 7F
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++San+Francisco,+CA+94103&entry=gmail&source=g>
>>
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++San+Francisco,+CA+94103&entry=gmail&source=g>
>> San Francisco, CA 94103
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++San+Francisco,+CA+94103&entry=gmail&source=g>
>>
>>


Kafka consumer, is there a way to filter out messages using key only?

2018-12-18 Thread Hao Sun
Hi, I am using 1.7 on K8S.

I have a huge amount of data in kafka, but I only need a tiny portion of it.
It is a keyed stream, the value in JSON encoded. I want to avoid
deserialization of the value, since it is very expensive. Can I only filter
based on the key?
I know there is a KeyedDeserializationSchema, but can I use it to filter
data?

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


Re: How many times Flink initialize an operator?

2018-12-11 Thread Hao Sun
Ok, thanks for the clarification.
Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Tue, Dec 11, 2018 at 2:38 PM Ken Krugler 
wrote:

> It’s based the parallelism of that operator, not the number of
> TaskManagers.
>
> E.g. you can have an operator with a parallelism of one, and your cluster
> has 10 TaskManagers, and you’ll only get a single instance of the operator.
>
> — Ken
>
>
> On Dec 11, 2018, at 2:01 PM, Hao Sun  wrote:
>
> I am using Flink 1.7 on K8S. This might does not matter :D.
>
> I think Flink only initialize the MapFunction once per taskManager right?
> Because Flink will serialize the execution graph and distribute it to
> taskManagers.
>
> Or it creates a new MapFunction for every element?
> stream.map(new MapFunction[I,O]).addSink(discard)
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> <http://www.scaleunlimited.com>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


How many times Flink initialize an operator?

2018-12-11 Thread Hao Sun
I am using Flink 1.7 on K8S. This might does not matter :D.

I think Flink only initialize the MapFunction once per taskManager right?
Because Flink will serialize the execution graph and distribute it to
taskManagers.

Or it creates a new MapFunction for every element?
stream.map(new MapFunction[I,O]).addSink(discard)

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-06 Thread Hao Sun
Thanks for the tip! I did change the jobGraph this time.

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Thu, Dec 6, 2018 at 2:47 AM Till Rohrmann  wrote:

> Hi Hao,
>
> if Flink tries to recover from a checkpoint, then the JobGraph should not
> be modified and the system should be able to restore the state.
>
> Have you changed the JobGraph and are you now trying to recover from the
> latest checkpoint which is stored in ZooKeeper? If so, then you can also
> start the job cluster with a different cluster id and manually pass the
> path to the latest checkpoint as the savepoint path to resume from. By
> specifying a new cluster id, the system will create a new ZNode in
> ZooKeeper and don't use the checkpoints from the previous run.
>
> If you did not change the JobGraph, then this sounds like a bug. For
> further investigation the debug log files would be helpful.
>
> Cheers,
> Till
>
> On Wed, Dec 5, 2018 at 7:18 PM Hao Sun  wrote:
>
>> Till, Flink is automatically trying to recover from a checkpoint not
>> savepoint. How can I get allowNonRestoredState applied in this case?
>>
>> Hao Sun
>> Team Lead
>> 1019 Market St. 7F
>> San Francisco, CA 94103
>>
>>
>> On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Hao,
>>>
>>> I think you need to provide a savepoint file via --fromSavepoint to
>>> resume from in order to specify --allowNonRestoredState. Otherwise this
>>> option will be ignored because it only works if you resume from a savepoint.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>>>
>>>> I am using 1.7 and job cluster on k8s.
>>>>
>>>> Here is how I start my job
>>>> 
>>>> docker-entrypoint.sh job-cluster -j
>>>> com.zendesk.fraud_prevention.examples.ConnectedStreams
>>>> --allowNonRestoredState
>>>> 
>>>>
>>>> *Seems like --allowNonRestoredState is not honored*
>>>>
>>>> === Logs ===
>>>> java","line":"1041","message":"Restoring job
>>>>  from latest valid checkpoint: Checkpoint
>>>> 8103 @ 0 for ."}
>>>> {"timestamp":"2018-12-04
>>>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
>>>> error occurred in the cluster entrypoint."}
>>>> java.lang.RuntimeException:
>>>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>>>> JobManager
>>>> at
>>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>>>> at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>>> not set up JobManager
>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>>> http://JobManagerRunner.java:176
>>>> <http://JobManagerRunner.java:176>)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>>>> at
>>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>>> ... 7 more
>>>> Caused by: java.lang.IllegalStateException: There is no operator for
>>>> the state 2f4bc854a18755730e14a90e1d4d7c7d
>>>> at
>>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
>>>> at org.apache.flink.runtime.jobmaster.JobMaster.(
>>>> http://JobMaster.java:296
>>>> <http://JobMaster.java:296>)
>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>>> http://JobManagerRunner.java:157
>>>> <http://JobManagerRunner.java:157>)
>>>> ==
>>>>
>>>> Can somebody help out? Thanks
>>>>
>>>> Hao Sun
>>>>
>>>


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-05 Thread Hao Sun
Till, Flink is automatically trying to recover from a checkpoint not
savepoint. How can I get allowNonRestoredState applied in this case?

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann  wrote:

> Hi Hao,
>
> I think you need to provide a savepoint file via --fromSavepoint to resume
> from in order to specify --allowNonRestoredState. Otherwise this option
> will be ignored because it only works if you resume from a savepoint.
>
> Cheers,
> Till
>
> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>
>> I am using 1.7 and job cluster on k8s.
>>
>> Here is how I start my job
>> 
>> docker-entrypoint.sh job-cluster -j
>> com.zendesk.fraud_prevention.examples.ConnectedStreams
>> --allowNonRestoredState
>> 
>>
>> *Seems like --allowNonRestoredState is not honored*
>>
>> === Logs ===
>> java","line":"1041","message":"Restoring job
>>  from latest valid checkpoint: Checkpoint
>> 8103 @ 0 for ."}
>> {"timestamp":"2018-12-04
>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
>> error occurred in the cluster entrypoint."}
>> java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>> http://JobManagerRunner.java:176
>> <http://JobManagerRunner.java:176>)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> ... 7 more
>> Caused by: java.lang.IllegalStateException: There is no operator for the
>> state 2f4bc854a18755730e14a90e1d4d7c7d
>> at
>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
>> at
>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(
>> http://JobMaster.java:296
>> <http://JobMaster.java:296>)
>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>> http://JobManagerRunner.java:157
>> <http://JobManagerRunner.java:157>)
>> ==
>>
>> Can somebody help out? Thanks
>>
>> Hao Sun
>>
>


Flink 1.7 job cluster (restore from checkpoint error)

2018-12-04 Thread Hao Sun
I am using 1.7 and job cluster on k8s.

Here is how I start my job

docker-entrypoint.sh job-cluster -j
com.zendesk.fraud_prevention.examples.ConnectedStreams
--allowNonRestoredState


*Seems like --allowNonRestoredState is not honored*

=== Logs ===
java","line":"1041","message":"Restoring job
 from latest valid checkpoint: Checkpoint
8103 @ 0 for ."}
{"timestamp":"2018-12-04
23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
error occurred in the cluster entrypoint."}
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalStateException: There is no operator for the
state 2f4bc854a18755730e14a90e1d4d7c7d
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
==

Can somebody help out? Thanks

Hao Sun


Re: Flink 1.7 RC missing flink-scala-shell jar

2018-11-14 Thread Hao Sun
Ok, thanks.

On Wed, Nov 14, 2018, 01:22 Chesnay Schepler  wrote:

> This is intended. Increasing the scala version basically broke the
> scala-shell and we haven't had the time to fix it. It is thus only
> available with scala 2.11. I agree that the error message could be better
> though.
>
>
> On 14.11.2018 03:44, Hao Sun wrote:
>
> I do not see flink-scala-shell jar under flink opt directory. To run
> scala shell, do I have to include the flink-scala-shell jar in my program
> jar?
> Why the error is saying Could not find or load main class
> org.apache.flink.api.scala.FlinkShell
>
> On Tue, Nov 13, 2018 at 4:48 PM Tzu-Li Chen  wrote:
>
>> Hi,
>>
>> Till is the release manager for 1.7, so ping him here.
>>
>> Best,
>> tison.
>>
>>
>> Hao Sun  于2018年11月14日周三 上午3:07写道:
>>
>>> Sorry I mean the scala-2.12 version is missing
>>>
>>> On Tue, Nov 13, 2018 at 10:58 AM Hao Sun  wrote:
>>>
>>>> I can not find the jar here:
>>>>
>>>> https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/
>>>> <https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/>
>>>>
>>>> Here is the error:
>>>> bash-4.4# ./bin/start-scala-shell.sh local
>>>> Error: Could not find or load main class
>>>> org.apache.flink.api.scala.FlinkShell
>>>>
>>>> I think somehow I have to include the flink-scala-shell jar under flink
>>>> lib.
>>>>
>>>> Any help will be appreciated.
>>>>
>>>
>


Re: Flink 1.7 RC missing flink-scala-shell jar

2018-11-13 Thread Hao Sun
I do not see flink-scala-shell jar under flink opt directory. To run scala
shell, do I have to include the flink-scala-shell jar in my program jar?
Why the error is saying Could not find or load main class
org.apache.flink.api.scala.FlinkShell

On Tue, Nov 13, 2018 at 4:48 PM Tzu-Li Chen  wrote:

> Hi,
>
> Till is the release manager for 1.7, so ping him here.
>
> Best,
> tison.
>
>
> Hao Sun  于2018年11月14日周三 上午3:07写道:
>
>> Sorry I mean the scala-2.12 version is missing
>>
>> On Tue, Nov 13, 2018 at 10:58 AM Hao Sun  wrote:
>>
>>> I can not find the jar here:
>>>
>>> https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/
>>> <https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/>
>>>
>>> Here is the error:
>>> bash-4.4# ./bin/start-scala-shell.sh local
>>> Error: Could not find or load main class
>>> org.apache.flink.api.scala.FlinkShell
>>>
>>> I think somehow I have to include the flink-scala-shell jar under flink
>>> lib.
>>>
>>> Any help will be appreciated.
>>>
>>


Re: Flink 1.7 RC missing flink-scala-shell jar

2018-11-13 Thread Hao Sun
Sorry I mean the scala-2.12 version is missing

On Tue, Nov 13, 2018 at 10:58 AM Hao Sun  wrote:

> I can not find the jar here:
>
> https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/
>
> Here is the error:
> bash-4.4# ./bin/start-scala-shell.sh local
> Error: Could not find or load main class
> org.apache.flink.api.scala.FlinkShell
>
> I think somehow I have to include the flink-scala-shell jar under flink
> lib.
>
> Any help will be appreciated.
>


Flink 1.7 RC missing flink-scala-shell jar

2018-11-13 Thread Hao Sun
I can not find the jar here:
https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/

Here is the error:
bash-4.4# ./bin/start-scala-shell.sh local
Error: Could not find or load main class
org.apache.flink.api.scala.FlinkShell

I think somehow I have to include the flink-scala-shell jar under flink lib.

Any help will be appreciated.


Re: How to run Flink 1.6 job cluster in "standalone" mode?

2018-11-12 Thread Hao Sun
Hi Tim, I am trying to debug this issue
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/java-io-IOException-NSS-is-already-initialized-td24247.html

And in general, how to debug code in a distributed JM/TM architecture is
very interesting topic to me.
Any hints will be appreciated. Thanks

On Mon, Nov 12, 2018 at 7:12 AM Timo Walther  wrote:

> Hi,
>
> a session cluster does not imply that JM + TM are always executed in the
> same JVM. Debugging a job running on different JVMs might be a bit more
> difficult to debug but it should still be straightforward.
>
> Maybe you can tell us what wrong behavior you observe?
>
> Btw. Flink's metrics can also already be quite helpful.
>
> Regards,
> Timo
>
> Am 07.11.18 um 14:15 schrieb Hao Sun:
> > "Standalone" here I mean job-mananger + taskmanager on the same JVM. I
> > have an issue to debug on our K8S environment, I can not reproduce it
> > in local docker env or Intellij. If JM and TM are running in different
> > VMs, it makes things harder to debug.
> >
> > Or is there a way to debug a job running on JM + TM on different VMs?
> > Is reverting to session cluster the only way to get JM + TM on the
> > same VM?
>
>
>


Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Hao Sun
Thanks, I'll check it out.

On Sun, Nov 11, 2018 at 10:52 AM Ufuk Celebi  wrote:

> I think there should be a log message, but I don't know what the exact
> format is (you would need to look through it and search for something
> related to CompletedCheckpointStore).
>
> An alternative is the web UI checkpointing tab. It shows the latest
> checkpoint used for restore of the job. You should see your savepoint
> there.
>
> Best,
>
> Ufuk
>
>
> On Sun, Nov 11, 2018 at 7:45 PM Hao Sun  wrote:
> >
> > This is great, I will try option 3 and let you know.
> > Can I log some message so I know job is recovered from the latest
> savepoint?
> >
> > On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:
> >>
> >> Hey Hao and Paul,
> >>
> >> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
> >> - As Paul pointed out, this is problematic as the node is a serialized
> >> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
> >> directory and not a path [1].
> >> - I would not recommend this approach at the moment
> >>
> >> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
> >> but cumbersome)
> >> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
> >> checkpoint statistics about the latest savepoint
> >> - The latest savepoint is available as a separate entry under
> >> `latest.savepoint` (If I'm reading the docs [2] correctly)
> >> - You would need to manually do this before shutting down (requires
> >> custom tooling to automate)
> >>
> >> 3) Use cancelWithSavepoint
> >> - If you keep `high-availability.cluster-id` consistent between
> >> executions of your job cluster, using cancelWithSavepoint [3] should
> >> add the the savepoint to ZK before cancelling the job
> >> - On the next execution of your job cluster, Flink should
> >> automatically pick it up (no need to attach a savepoint restore path
> >> manually)
> >>
> >> I've not tried 3) myself yet, but believe it should work. If you have
> >> time to try it out, I'd be happy to hear whether it works as expected
> >> for you.
> >>
> >> – Ufuk
> >>
> >> [1] I believe this is overly complicated and should be simplified in
> the future.
> >> [2] Search /jobs/:jobid/checkpoints in
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html>
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation>
> >>
> >> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
> >> >
> >> > Can we add an option to allow job cluster mode to start from the
> latest save point? Otherwise I have to somehow get the info from ZK, before
> job cluster's container started by K8s.
> >> >
> >> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
> >> >>
> >> >> Hi Hao,
> >> >>
> >> >> The savepoint path is stored in ZK, but it’s in binary format, so in
> order to retrieve the path you have to deserialize it back to some Flink
> internal object.
> >> >>
> >> >> A better approach would be using REST api to get the path. You could
> find it here[1].
> >> >>
> >> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html>
> >> >>
> >> >> Best,
> >> >> Paul Lam
> >> >>
> >> >>
> >> >> 在 2018年11月9日,13:55,Hao Sun  写道:
> >> >>
> >> >> Since this save point path is very useful to application updates,
> where is this information stored? Can we keep it in ZK or S3 for retrieval?
> >> >>
> >> >> 
> >> >>
> >> >>
>


Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Hao Sun
This is great, I will try option 3 and let you know.
Can I log some message so I know job is recovered from the latest savepoint?

On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:

> Hey Hao and Paul,
>
> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
> - As Paul pointed out, this is problematic as the node is a serialized
> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
> directory and not a path [1].
> - I would not recommend this approach at the moment
>
> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
> but cumbersome)
> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
> checkpoint statistics about the latest savepoint
> - The latest savepoint is available as a separate entry under
> `latest.savepoint` (If I'm reading the docs [2] correctly)
> - You would need to manually do this before shutting down (requires
> custom tooling to automate)
>
> 3) Use cancelWithSavepoint
> - If you keep `high-availability.cluster-id` consistent between
> executions of your job cluster, using cancelWithSavepoint [3] should
> add the the savepoint to ZK before cancelling the job
> - On the next execution of your job cluster, Flink should
> automatically pick it up (no need to attach a savepoint restore path
> manually)
>
> I've not tried 3) myself yet, but believe it should work. If you have
> time to try it out, I'd be happy to hear whether it works as expected
> for you.
>
> – Ufuk
>
> [1] I believe this is overly complicated and should be simplified in the
> future.
> [2] Search /jobs/:jobid/checkpoints in
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation>
>
> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
> >
> > Can we add an option to allow job cluster mode to start from the latest
> save point? Otherwise I have to somehow get the info from ZK, before job
> cluster's container started by K8s.
> >
> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
> >>
> >> Hi Hao,
> >>
> >> The savepoint path is stored in ZK, but it’s in binary format, so in
> order to retrieve the path you have to deserialize it back to some Flink
> internal object.
> >>
> >> A better approach would be using REST api to get the path. You could
> find it here[1].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html>
> >>
> >> Best,
> >> Paul Lam
> >>
> >>
> >> 在 2018年11月9日,13:55,Hao Sun  写道:
> >>
> >> Since this save point path is very useful to application updates, where
> is this information stored? Can we keep it in ZK or S3 for retrieval?
> >>
> >> 
> >>
> >>
>


Re: java.io.IOException: NSS is already initialized

2018-11-10 Thread Hao Sun
Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have the
same issue with presto.

- why checkpoint is not starting from 1? old chk stored in ZK caused it, I
cleaned it up, but not very helpful
- I switched to Flink + Hadoop28, and used hadoop s3, with no other
changes, check pointing is working with the hadoop flavour.

On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:

> Hey Hao Sun,
>
> - Is this an intermittent failure or permanent? The logs indicate that
> some checkpoints completed before the error occurs (e.g. checkpoint
> numbers are greater than 1).
>
> - Which Java versions are you using? And which Java image? I've
> Googled similar issues that seem to be related to the JVM, e.g. [1].
>
> Best,
>
> Ufuk
>
> [1]
> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
> <https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972>
>
>
> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
> >
> > Thanks, any insight/help here is appreciated.
> >
> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz 
> wrote:
> >>
> >> Hi Hao,
> >>
> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who
> were recently working with S3, maybe they will have some ideas.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 03/11/2018 03:09, Hao Sun wrote:
> >>
> >> Same environment, new error.
> >>
> >> I can run the same docker image with my local Mac, but on K8S, this
> gives me this error.
> >> I can not think of any difference between local Docker and K8S Docker.
> >>
> >> Any hint will be helpful. Thanks
> >>
> >> 
> >>
> >> 2018-11-02 23:29:32,981 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> >> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.Exception: Could not materialize checkpoint 235
> for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >> ... 6 more
> >> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl
> >> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> >>
> >> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(
> http://OperatorSnapshotFinalizer.java:53
> <http://OperatorSnapshotFinalizer.java:53>)
> >>
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >> ... 5 more
> >> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl
> >> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
> >>
> >

Re: Where is the "Latest Savepoint" information saved?

2018-11-09 Thread Hao Sun
Can we add an option to allow job cluster mode to start from the latest
save point? Otherwise I have to somehow get the info from ZK, before job
cluster's container started by K8s.

On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:

> Hi Hao,
>
> The savepoint path is stored in ZK, but it’s in binary format, so in order
> to retrieve the path you have to deserialize it back to some Flink internal
> object.
>
> A better approach would be using REST api to get the path. You could find
> it here[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html>
>
> Best,
> Paul Lam
>
>
> 在 2018年11月9日,13:55,Hao Sun  写道:
>
> Since this save point path is very useful to application updates, where is
> this information stored? Can we keep it in ZK or S3 for retrieval?
>
> 
>
>
>


The heartbeat of TaskManager with id ... timed out.

2018-11-08 Thread Hao Sun
I am running Flink 1.7 on K8S. I am not sure how to debug this issue. I
turned on debug on JM/TM.

I am not sure this part is related or not. How could an Actor suddenly
disappear?

=
2018-11-09 04:47:19,480 DEBUG
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  - Query
metrics for akka://flink-metrics/user/MetricQueryService.
2018-11-09 04:47:19,577 DEBUG
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  -
Retrieve metric query service gateway for
akka.tcp://flink-metrics@fps-flink-taskmanager-6f8f687fc8-b5qmm
:34429/user/MetricQueryService_0cfaa7b8f193a8002f121282298c58ac
2018-11-09 04:47:19,577 DEBUG
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  -
Retrieve metric query service gateway for
akka.tcp://flink-metrics@fps-flink-taskmanager-6f8f687fc8-ckwzp
:43062/user/MetricQueryService_1930889e5b6c51cbe57428f9a664e4dc
2018-11-09 04:47:19,578 DEBUG
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  -
Retrieve metric query service gateway for
akka.tcp://flink-metrics@fps-flink-taskmanager-6f8f687fc8-bzq2w
:39393/user/MetricQueryService_fcaf62301df0a6aeb29a65470cfe1e7a
2018-11-09 04:47:19,585 TRACE
org.apache.flink.runtime.rest.FileUploadHandler   - Received
request. URL:/jobs/ Method:GET
2018-11-09 04:47:19,613 DEBUG
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  - Could
not retrieve QueryServiceGateway.
java.util.concurrent.CompletionException: akka.actor.ActorNotFound: Actor
not found for:
ActorSelection[Anchor(akka.tcp://flink-metrics@fps-flink-taskmanager-6f8f687fc8-ckwzp:43062/),
Path(/user/MetricQueryService_1930889e5b6c51cbe57428f9a664e4dc)]
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:49)
at scala.concurrent.Promise.complete$(Promise.scala:48)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:183)
at scala.concurrent.Promise.failure(Promise.scala:100)
at scala.concurrent.Promise.failure$(Promise.scala:100)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:183)
at akka.actor.ActorSelection.$anonfun$resolveOne$1(ActorSelection.scala:68)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:76)
at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:120)
at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:114)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:75)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:538)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:558)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:595)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:584)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:98)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:593)
at akka.actor.Actor.aroundPostStop(Actor.scala:515)
at akka.actor.Actor.aroundPostStop$(Actor.scala:515)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:446)
at akka.actor.du

Re: java.io.IOException: NSS is already initialized

2018-11-08 Thread Hao Sun
Thanks, any insight/help here is appreciated.

On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz 
wrote:

> Hi Hao,
>
> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who
> were recently working with S3, maybe they will have some ideas.
>
> Best,
>
> Dawid
> On 03/11/2018 03:09, Hao Sun wrote:
>
> Same environment, new error.
>
> I can run the same docker image with my local Mac, but on K8S, this gives
> me this error.
> I can not think of any difference between local Docker and K8S Docker.
>
> Any hint will be helpful. Thanks
>
> 
>
> 2018-11-02 23:29:32,981 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by: java.lang.Exception: Could not materialize checkpoint 235 for
> operator Source*: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> *Caused by: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl*
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(
> http://OperatorSnapshotFinalizer.java:53
> <http://OperatorSnapshotFinalizer.java:53>)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> *Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl*
> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
>
> at sun.security.ssl.SSLSocketImpl.(http://SSLSocketImpl.java:572
> <http://SSLSocketImpl.java:572>)
>
> at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.$Proxy4.connect(Unknown
> Source)
> at
> org.apache.

How to run Flink 1.6 job cluster in "standalone" mode?

2018-11-07 Thread Hao Sun
"Standalone" here I mean job-mananger + taskmanager on the same JVM. I have
an issue to debug on our K8S environment, I can not reproduce it in local
docker env or Intellij. If JM and TM are running in different VMs, it makes
things harder to debug.

Or is there a way to debug a job running on JM + TM on different VMs?
Is reverting to session cluster the only way to get JM + TM on the same VM?


Re: Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-05 Thread Hao Sun
Thanks all.

On Mon, Nov 5, 2018 at 2:05 AM Ufuk Celebi  wrote:

> On Sun, Nov 4, 2018 at 10:34 PM Hao Sun  wrote:
> > Thanks that also works. To avoid same issue with zookeeper, I assume I
> have to do the same trick?
>
> Yes, exactly. The following configuration [1] entry takes care of this:
>
> high-availability.cluster-id: application-1
>
> This will result in ZooKeeper entries as follows:
> /flink/application-1/[...].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html#config-file-flink-confyaml
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html#config-file-flink-confyaml>
>


Re: Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-04 Thread Hao Sun
Thanks that also works. To avoid same issue with zookeeper, I assume I have
to do the same trick?

On Sun, Nov 4, 2018, 03:34 Ufuk Celebi  wrote:

> Hey Hao Sun,
>
> this has been changed recently [1] in order to properly support
> failover in job cluster mode.
>
> A workaround for you would be to add an application identifier to the
> checkpoint path of each application, resulting in S3 paths like
> application-/00...00/chk-64.
>
> Is that a feasible solution?
>
> As a side note: It was considered to keep the job ID fixed, but make
> it configurable (e.g. by providing a --job-id argument) which would
> also help to avoid this situation, but I'm not aware of any concrete
> plans to move forward with that approach.
>
> Best,
>
> Ufuk
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues/FLINK-10291
> <https://issues.apache.org/jira/projects/FLINK/issues/FLINK-10291>
>
> On Sun, Nov 4, 2018 at 3:39 AM Hao Sun  wrote:
> >
> > I am wondering if I can customize job_id for job cluster mode. Currently
> it is always . I am running multiple job
> clusters and sharing s3, it means checkpoints will be shared by different
> jobs as well e.g. /chk-64, how can I avoid
> this?
> >
> > Thanks
>


Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-03 Thread Hao Sun
I am wondering if I can customize job_id for job cluster mode. Currently it
is always . I am running multiple job
clusters and sharing s3, it means checkpoints will be shared by different
jobs as well e.g. /chk-64, how can I avoid
this?

Thanks


Re: java.io.IOException: NSS is already initialized

2018-11-02 Thread Hao Sun
ob ConnectedStreams maxwell.accounts
() if no longer possible.
=====

On Thu, Nov 1, 2018 at 9:22 PM Hao Sun  wrote:

> I am on Flink 1.6.2 (no Hadoop, in docker + K8S), using rocksdb and S3
> (presto)
> I got this error when flink creating a checking point
>
>
> ===
> 2018-11-02 04:00:55,011 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 130 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 130 for
> operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> *Caused by: java.util.concurrent.ExecutionException:
> java.lang.ExceptionInInitializerError*
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> *Caused by: java.lang.ExceptionInInitializerError*
> at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:183)
> at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:148)
> at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:79)
> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
> at sun.security.ssl.SSLSocketImpl.(SSLSocketImpl.java:572)
> at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.$Proxy4.connect(Unknown
> Source)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
> at
> org.apache.flink.fs.s

java.io.IOException: NSS is already initialized

2018-11-01 Thread Hao Sun
I am on Flink 1.6.2 (no Hadoop, in docker + K8S), using rocksdb and S3
(presto)
I got this error when flink creating a checking point


===
2018-11-02 04:00:55,011 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
ConnectedStreams maxwell.accounts ()
switched from state RUNNING to FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
130 for operator Source: KafkaSource(maxwell.accounts) ->
MaxwellFilter->Maxwell(maxwell.accounts) ->
FixedDelayWatermark(maxwell.accounts) ->
MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
influxdbSink(maxwell.accounts) (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 130 for
operator Source: KafkaSource(maxwell.accounts) ->
MaxwellFilter->Maxwell(maxwell.accounts) ->
FixedDelayWatermark(maxwell.accounts) ->
MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
influxdbSink(maxwell.accounts) (1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
*Caused by: java.util.concurrent.ExecutionException:
java.lang.ExceptionInInitializerError*
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
*Caused by: java.lang.ExceptionInInitializerError*
at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:183)
at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:148)
at sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:79)
at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
at sun.security.ssl.SSLSocketImpl.(SSLSocketImpl.java:572)
at
sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.$Proxy4.connect(Unknown
Source)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.ja

Re: anybody can start flink with job mode?

2018-08-28 Thread Hao Sun
Thanks Till for the follow up, I can run my job now.

On Tue, Aug 28, 2018, 00:57 Till Rohrmann  wrote:

> Hi Hao,
>
> Vino is right, you need to specify the -j/--job-classname option which
> specifies the job name you want to execute. Please make sure that the jar
> containing this class is on the class path.
>
> I recently pushed some fixes which generate a better error message than
> the one you've received. If you check out the latest master branch, then it
> should work better.
>
> Let me know if you should run into other problems.
>
> Cheers,
> Till
>
> On Sat, Aug 25, 2018 at 5:11 AM Hao Sun  wrote:
>
>> Thanks, I'll look into it.
>>
>> On Fri, Aug 24, 2018, 19:44 vino yang  wrote:
>>
>>> Hi Hao Sun,
>>>
>>> From the error log, it seems that the jar package for the job was not
>>> found.
>>> You must make sure your Jar is in the classpath.
>>> Related documentation may not be up-to-date, and there is a discussion
>>> on this issue on this mailing list. [1]
>>>
>>> I see that the status of FLINK-10001 [2] is closed and it will be
>>> updated with the release of 1.6.1 and 1.7.0.
>>>
>>> [1]:
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E
>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E>
>>> [2]: https://issues.apache.org/jira/browse/FLINK-10001
>>> <https://issues.apache.org/jira/browse/FLINK-10001>
>>>
>>> Thanks, vino.
>>>
>>>
>>> Hao Sun  于2018年8月25日周六 上午6:37写道:
>>>
>>>> I got an error like this.
>>>>
>>>> $ docker run -it flink-job:latest job-cluster
>>>> Starting the job-cluster
>>>> config file:
>>>> jobmanager.rpc.address: localhost
>>>> jobmanager.rpc.port: 6123
>>>> jobmanager.heap.size: 1024m
>>>> taskmanager.heap.size: 1024m
>>>> taskmanager.numberOfTaskSlots: 1
>>>> parallelism.default: 1
>>>> rest.port: 8081
>>>> Starting standalonejob as a console application on host cf9bd047082c.
>>>> 2018-08-24 22:33:00,773 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>> 
>>>> 2018-08-24 22:33:00,774 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
>>>> StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
>>>> Date:07.08.2018 @ 13:31:13 UTC)
>>>> 2018-08-24 22:33:00,775 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
>>>> flink
>>>> 2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
>>>> Unable to load native-hadoop library for your platform... using
>>>> builtin-java classes where applicable
>>>> 2018-08-24 22:33:01,232 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
>>>> Hadoop/Kerberos user: flink
>>>> 2018-08-24 22:33:01,232 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
>>>> Server VM - Oracle Corporation - 1.8/25.111-b14
>>>> 2018-08-24 22:33:01,232 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
>>>> 981 MiBytes
>>>> 2018-08-24 22:33:01,232 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
>>>> /usr/lib/jvm/java-1.8-openjdk/jre
>>>> 2018-08-24 22:33:01,236 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
>>>> 2.8.3
>>>> 2018-08-24 22:33:01,236 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
>>>> 2018-08-24 22:33:01,236 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
>>>> 2018-08-24 22:33:01,236 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
>>>> 2018-08-24 22:33:01,237 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>> -Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
>>>> 2018-08-24 22:33:01,237 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>> -Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/l

Re: anybody can start flink with job mode?

2018-08-24 Thread Hao Sun
Thanks, I'll look into it.

On Fri, Aug 24, 2018, 19:44 vino yang  wrote:

> Hi Hao Sun,
>
> From the error log, it seems that the jar package for the job was not
> found.
> You must make sure your Jar is in the classpath.
> Related documentation may not be up-to-date, and there is a discussion on
> this issue on this mailing list. [1]
>
> I see that the status of FLINK-10001 [2] is closed and it will be updated
> with the release of 1.6.1 and 1.7.0.
>
> [1]:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E
> <http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E>
> [2]: https://issues.apache.org/jira/browse/FLINK-10001
> <https://issues.apache.org/jira/browse/FLINK-10001>
>
> Thanks, vino.
>
>
> Hao Sun  于2018年8月25日周六 上午6:37写道:
>
>> I got an error like this.
>>
>> $ docker run -it flink-job:latest job-cluster
>> Starting the job-cluster
>> config file:
>> jobmanager.rpc.address: localhost
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.size: 1024m
>> taskmanager.heap.size: 1024m
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>> rest.port: 8081
>> Starting standalonejob as a console application on host cf9bd047082c.
>> 2018-08-24 22:33:00,773 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> 
>> 2018-08-24 22:33:00,774 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
>> StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
>> Date:07.08.2018 @ 13:31:13 UTC)
>> 2018-08-24 22:33:00,775 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
>> flink
>> 2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
>> Unable to load native-hadoop library for your platform... using
>> builtin-java classes where applicable
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
>> Hadoop/Kerberos user: flink
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
>> Server VM - Oracle Corporation - 1.8/25.111-b14
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
>> 981 MiBytes
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
>> /usr/lib/jvm/java-1.8-openjdk/jre
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
>> 2.8.3
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/logback-console.xml
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> /opt/flink-1.6.0/conf
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath:
>> /opt/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/opt/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/opt/flink-1.6.0/lib/job.jar:/opt/flink-1.6.0/lib/log4j-1.2.17.jar:/opt/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> 
>> 2018-08-24 22:33:01,240 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX
>> signal handlers for [TERM, HUP, INT]
>> 2018-08-24 22:33:01,248 ERROR 
>> *org.apache.flink.runtime.entrypoint.ClusterEntrypoint
>> 

anybody can start flink with job mode?

2018-08-24 Thread Hao Sun
I got an error like this.

$ docker run -it flink-job:latest job-cluster
Starting the job-cluster
config file:
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8081
Starting standalonejob as a console application on host cf9bd047082c.
2018-08-24 22:33:00,773 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2018-08-24 22:33:00,774 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
Date:07.08.2018 @ 13:31:13 UTC)
2018-08-24 22:33:00,775 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
flink
2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
Hadoop/Kerberos user: flink
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
Server VM - Oracle Corporation - 1.8/25.111-b14
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
981 MiBytes
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
/usr/lib/jvm/java-1.8-openjdk/jre
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
2.8.3
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/logback-console.xml
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/opt/flink-1.6.0/conf
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath:
/opt/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/opt/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/opt/flink-1.6.0/lib/job.jar:/opt/flink-1.6.0/lib/log4j-1.2.17.jar:/opt/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2018-08-24 22:33:01,240 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX
signal handlers for [TERM, HUP, INT]
2018-08-24 22:33:01,248 ERROR
*org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Could not parse command line arguments [--configDir,
/opt/flink-1.6.0/conf].*
org.apache.flink.runtime.entrypoint.FlinkParseException: Failed to parse
the command line arguments.
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
at
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
Caused by: org.apache.commons.cli.MissingOptionException: *Missing required
option: j*
at
org.apache.commons.cli.DefaultParser.checkRequiredOptions(DefaultParser.java:199)
at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:130)
at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
... 1 more
Exception in thread "main" java.lang.IllegalArgumentException:
cmdLineSyntax not provided
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:546)
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:492)
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:408)
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.printHelp(CommandLineParser.java:60)
at
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:146)


Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Hao Sun
adding my vote to K8S Job mode, maybe it is this?
> Smoothen the integration in Container environment, like "Flink as a
Library", and easier integration with Kubernetes services and other proxies.



On Mon, Jun 4, 2018 at 11:01 PM Ben Yan  wrote:

> Hi Stephan,
>
> Will  [ https://issues.apache.org/jira/browse/FLINK-5479
>  ]
> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
> partitions) be included in 1.6? As we are seeing more users with this
> issue on the mailing lists.
>
> Thanks.
> Ben
>
> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>>
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e
>> 
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: d...@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make releasing more lightweight from now on.
>>
>> (Side note: There are also some nightly stress tests that we created and
>> run at data Artisans, and where we are looking whether and in which way it
>> would make sense to contribute them to Flink.)
>>
>> *== Features and focus areas ==*
>>
>> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
>> network stack, recovery, SQL joins and client, ... Following something like
>> a "tick-tock-model", I would suggest to focus the next release more on
>> integrations, tooling, and reducing user friction.
>>
>> Of course, this does not mean that no other pull request gets reviewed,
>> an no other topic will be examined - it is simply meant as a help to
>> understand where to expect more activity during the next release cycle.
>> Note that these are really the coarse focus areas - don't read this as a
>> comprehensive list.
>>
>> This list is my first suggestion, based on discussions with committers,
>> users, and mailing list questions.
>>
>>   - Support Java 9 and Scala 2.12
>>
>>   - Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>   - Polish the remaing parts of the FLIP-6 rewrite
>>
>>   - Improve state backends with asynchronous timer snapshots, efficient
>> timer deletes, state TTL, and broadcast state support in RocksDB.
>>
>>   - Extends Streaming Sinks:
>>  - Bucketing Sink should support S3 properly (compensate for eventual
>> consistency), work with Flink's shaded S3 file systems, and efficiently
>> support formats that compress/index arcoss individual rows (Parquet, ORC,
>> ...)
>>  - Support ElasticSearch's new REST API
>>
>>   - Smoothen State Evolution to support type conversion on snapshot
>> restore
>>
>>   - Enhance Stream SQL and CEP
>>  - Add support for "update by key" Table Sources
>>  - Add more table sources and sinks (Kafka, Kinesis, Files, K/V
>> stores)
>>  - Expand SQL client
>>  - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
>>  - Improve CEP Performance of SharedBuffer on RocksDB
>>
>>
>>
>>
>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
After I added these to my flink-conf.yml, everything works now.

s3.sse.enabled: true
s3.sse.type: S3

Thanks for the help!
In general I also want to know what config keys for presto-s3 I can use.


On Tue, Jun 5, 2018 at 11:43 AM Hao Sun  wrote:

> also a follow up question. Can I use all properties here? Should I remove
> `hive.` for all the keys?
>
> https://prestodb.io/docs/current/connector/hive.html#hive-configuration-properties
>
> More specifically how I configure sse for s3?
>
> On Tue, Jun 5, 2018 at 11:33 AM Hao Sun  wrote:
>
>> I do not have the S3A lib requirement anymore, but I got a new error.
>>
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Access Denied
>>
>> Here are more logs:
>> https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d
>>
>> Thanks
>>
>> On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> sorry, yes, you don't have to add any of the Hadoop dependencies.
>>> Everything that's needed comes in the presto s3 jar.
>>>
>>> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
>>> used if you use s3a. And yes, you add config values to the flink config as
>>> s3.xxx.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>>>
>>> Thanks for pick up my question. I had s3a in the config now I removed it.
>>> I will post a full trace soon, but want to get some questions answered
>>> to help me understand this better.
>>>
>>> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I
>>> use this?
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
>>> <http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz>
>>> 2. How do I configure presto for endpoints, encryption? The S3A file
>>> system needed core-site.yml to configure such things and S3 V4 signature.
>>> Do I have to do it for presto?
>>> 3. If yes, how to do it? Just add s3.xxx to flink-config?
>>> like s3.server-side-encryption-algorithm: AES256
>>> s3.endpoint: 's3.amazonaws.com
>>> <http://s3.amazonaws.com/>' other
>>> values for France regions, etc
>>>
>>> I will post more logs when I get one. Thanks
>>>
>>> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> what are you using as the FileSystem scheme? s3 or s3a?
>>>>
>>>> Also, could you also post the full stack trace, please?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>>>>
>>>> I am trying to figure out how to use S3 as state storage.
>>>> The recommended way is
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended>
>>>>
>>>> Seems like I only have to do two things:
>>>> *1. Put flink-s3-fs-presto to the lib*
>>>> *2. Configure *
>>>>
>>>> s3.access-key: your-access-keys3.secret-key: your-secret-key
>>>>
>>>>
>>>> But I see this exception: ClassNotFoundException:
>>>> NativeS3FileSystem/S3AFileSystem Not Found
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency>
>>>>
>>>> Add it is suggested to add more libs.
>>>> So I am confused here, is there a step 3 needed? Isn't the presto jar
>>>> is all self contained?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
also a follow up question. Can I use all properties here? Should I remove
`hive.` for all the keys?
https://prestodb.io/docs/current/connector/hive.html#hive-configuration-properties

More specifically how I configure sse for s3?

On Tue, Jun 5, 2018 at 11:33 AM Hao Sun  wrote:

> I do not have the S3A lib requirement anymore, but I got a new error.
>
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Access Denied
>
> Here are more logs:
> https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d
>
> Thanks
>
> On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> sorry, yes, you don't have to add any of the Hadoop dependencies.
>> Everything that's needed comes in the presto s3 jar.
>>
>> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
>> used if you use s3a. And yes, you add config values to the flink config as
>> s3.xxx.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>>
>> Thanks for pick up my question. I had s3a in the config now I removed it.
>> I will post a full trace soon, but want to get some questions answered to
>> help me understand this better.
>>
>> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
>> this?
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
>> <http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz>
>> 2. How do I configure presto for endpoints, encryption? The S3A file
>> system needed core-site.yml to configure such things and S3 V4 signature.
>> Do I have to do it for presto?
>> 3. If yes, how to do it? Just add s3.xxx to flink-config?
>> like s3.server-side-encryption-algorithm: AES256
>> s3.endpoint: 's3.amazonaws.com
>> <http://s3.amazonaws.com/>' other
>> values for France regions, etc
>>
>> I will post more logs when I get one. Thanks
>>
>> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> what are you using as the FileSystem scheme? s3 or s3a?
>>>
>>> Also, could you also post the full stack trace, please?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>>>
>>> I am trying to figure out how to use S3 as state storage.
>>> The recommended way is
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended>
>>>
>>> Seems like I only have to do two things:
>>> *1. Put flink-s3-fs-presto to the lib*
>>> *2. Configure *
>>>
>>> s3.access-key: your-access-keys3.secret-key: your-secret-key
>>>
>>>
>>> But I see this exception: ClassNotFoundException:
>>> NativeS3FileSystem/S3AFileSystem Not Found
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency>
>>>
>>> Add it is suggested to add more libs.
>>> So I am confused here, is there a step 3 needed? Isn't the presto jar is
>>> all self contained?
>>>
>>> Thanks
>>>
>>>
>>>
>>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
I do not have the S3A lib requirement anymore, but I got a new error.

org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied

Here are more logs:
https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d

Thanks

On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek  wrote:

> Hi,
>
> sorry, yes, you don't have to add any of the Hadoop dependencies.
> Everything that's needed comes in the presto s3 jar.
>
> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
> used if you use s3a. And yes, you add config values to the flink config as
> s3.xxx.
>
> Best,
> Aljoscha
>
>
> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>
> Thanks for pick up my question. I had s3a in the config now I removed it.
> I will post a full trace soon, but want to get some questions answered to
> help me understand this better.
>
> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
> this?
> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
> <http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz>
> 2. How do I configure presto for endpoints, encryption? The S3A file
> system needed core-site.yml to configure such things and S3 V4 signature.
> Do I have to do it for presto?
> 3. If yes, how to do it? Just add s3.xxx to flink-config?
> like s3.server-side-encryption-algorithm: AES256
> s3.endpoint: 's3.amazonaws.com
> <http://s3.amazonaws.com/>' other values
> for France regions, etc
>
> I will post more logs when I get one. Thanks
>
> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> what are you using as the FileSystem scheme? s3 or s3a?
>>
>> Also, could you also post the full stack trace, please?
>>
>> Best,
>> Aljoscha
>>
>>
>> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>>
>> I am trying to figure out how to use S3 as state storage.
>> The recommended way is
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended>
>>
>> Seems like I only have to do two things:
>> *1. Put flink-s3-fs-presto to the lib*
>> *2. Configure *
>>
>> s3.access-key: your-access-keys3.secret-key: your-secret-key
>>
>>
>> But I see this exception: ClassNotFoundException:
>> NativeS3FileSystem/S3AFileSystem Not Found
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency>
>>
>> Add it is suggested to add more libs.
>> So I am confused here, is there a step 3 needed? Isn't the presto jar is
>> all self contained?
>>
>> Thanks
>>
>>
>>
>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
Thanks for pick up my question. I had s3a in the config now I removed it.
I will post a full trace soon, but want to get some questions answered to
help me understand this better.

1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
this?
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
2. How do I configure presto for endpoints, encryption? The S3A file system
needed core-site.yml to configure such things and S3 V4 signature. Do I
have to do it for presto?
3. If yes, how to do it? Just add s3.xxx to flink-config?
like s3.server-side-encryption-algorithm: AES256
s3.endpoint: 's3.amazonaws.com' other values for France regions, etc

I will post more logs when I get one. Thanks

On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek  wrote:

> Hi,
>
> what are you using as the FileSystem scheme? s3 or s3a?
>
> Also, could you also post the full stack trace, please?
>
> Best,
> Aljoscha
>
>
> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>
> I am trying to figure out how to use S3 as state storage.
> The recommended way is
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended>
>
> Seems like I only have to do two things:
> *1. Put flink-s3-fs-presto to the lib*
> *2. Configure *
>
> s3.access-key: your-access-keys3.secret-key: your-secret-key
>
>
> But I see this exception: ClassNotFoundException:
> NativeS3FileSystem/S3AFileSystem Not Found
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency>
>
> Add it is suggested to add more libs.
> So I am confused here, is there a step 3 needed? Isn't the presto jar is
> all self contained?
>
> Thanks
>
>
>


Re: Flink 1.5, failed to instantiate S3 FS

2018-06-02 Thread Hao Sun
Thanks Amit for checking. I do not use hadoop, but I am using Flink with
bundled HDP 2.8 binary. I think this article is right, I mixed 2.7 lib and
2.8 binary somehow.

On Sat, Jun 2, 2018 at 1:05 AM Amit Jain  wrote:

> Hi Hao,
>
> Have look over
> https://issues.apache.org/jira/browse/HADOOP-13811?focusedCommentId=15703276&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15703276
> 
>
> What version of Hadoop are you using? Could you provide classpath used
> by Flink Job Manager, it is present in jobmanager.log file.
>
> --
> Cheers,
> Amit
>


Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-01 Thread Hao Sun
I am trying to figure out how to use S3 as state storage.
The recommended way is
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended

Seems like I only have to do two things:
*1. Put flink-s3-fs-presto to the lib*
*2. Configure *

s3.access-key: your-access-keys3.secret-key: your-secret-key


But I see this exception: ClassNotFoundException:
NativeS3FileSystem/S3AFileSystem Not Found

https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency

Add it is suggested to add more libs.
So I am confused here, is there a step 3 needed? Isn't the presto jar is
all self contained?

Thanks


Flink 1.5, failed to instantiate S3 FS

2018-06-01 Thread Hao Sun
I can not find anywhere I have 100M. Not sure why I get this failure.
This is in my dev docker env. Same configure file worked well for 1.3.2



= Log 
Caused by: org.apache.flink.util.FlinkException: Failed to submit job
aa75905062dd0487034bb9d8b6617dc2.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
counter: Cannot instantiate file system for URI:
s3a://zendesk-dev-orca-fps/pod0/checkpoints-meta
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:253)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:495)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151)
... 26 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
s3a://zendesk-dev-orca-fps/pod0/checkpoints-meta
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:61)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:247)
... 33 more
*Caused by: java.lang.NumberFormatException: For input string: "100M"*
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at
org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 40 more


Re: [ANNOUNCE] Apache Flink 1.5.0 release

2018-05-25 Thread Hao Sun
This is great. Thanks for the effort to get this out!

On Fri, May 25, 2018 at 9:47 AM Till Rohrmann  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.5.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
> 
>
> Please check out the release blog post for an overview of the new features
> and improvements and the list of contributors:
>
> http://flink.apache.org/news/2018/05/18/release-1.5.0.html
> 
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341764
> 
>
> I would like to thank all contributors for working very hard on making
> this release a success!
>
> Best,
> Till
>


Re: keyBy and parallelism

2018-04-11 Thread Hao Sun
>From what I learnt, you have to control parallelism your self. You can set
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.

On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:

> Hi all,
>
> Imagine I have a default parallelism of 16 and I do something like
>
> stream.keyBy("something").flatMap()
>
> Now let's imagine I have less than 16 keys, maybe 8.
>
> How many parallel executions of the flatMap function will I get? 8 because
> I have 8 keys, or 16 because I have default parallelism at 16?
>
> (and I will have follow up questions depending on the answer I suspect ;))
>
> Thanks,
> --
> Christophe
>


Re: java.lang.Exception: TaskManager was lost/killed

2018-04-09 Thread Hao Sun
Same story here, 1.3.2 on K8s. Very hard to find reasons on why a TM is
killed. Not likely caused by memory leak. If there is a logger I have turn
on please let me know.

On Mon, Apr 9, 2018, 13:41 Lasse Nedergaard 
wrote:

> We see the same running 1.4.2 on Yarn hosted on Aws EMR cluster. The only
> thing I can find in the logs from are SIGTERM with the code 15 or -100.
> Today our simple job reading from Kinesis and writing to Cassandra was
> killed. The other day in another job I identified a map state.remove
> command to cause a task manager lost without and exception
> I find it frustrating that it is so hard to find the root cause.
> If I look on historical metrics on cpu, heap and non heap I can’t see
> anything that should cause a problem.
> So any ideas about how to debug this kind of exception is much
> appreciated.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 9. apr. 2018 kl. 21.48 skrev Chesnay Schepler :
>
> We will need more information to offer any solution. The exception simply
> means that a TaskManager shut down, for which there are a myriad of
> possible explanations.
>
> Please have a look at the TaskManager logs, they may contain a hint as to
> why it shut down.
>
> On 09.04.2018 16:01, Javier Lopez wrote:
>
> Hi,
>
> "are you moving the job  jar to  the ~/flink-1.4.2/lib path ?  " -> Yes,
> to every node in the cluster.
>
> On 9 April 2018 at 15:37, miki haiat  wrote:
>
>> Javier
>> "adding the jar file to the /lib path of every task manager"
>> are you moving the job  jar to  the* ~/flink-1.4.2/lib path* ?
>>
>> On Mon, Apr 9, 2018 at 12:23 PM, Javier Lopez 
>> wrote:
>>
>>> Hi,
>>>
>>> We had the same metaspace problem, it was solved by adding the jar file
>>> to the /lib path of every task manager, as explained here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html#avoiding-dynamic-classloading
>>> .
>>>  As well we
>>> added these java options: "-XX:CompressedClassSpaceSize=100M
>>> -XX:MaxMetaspaceSize=300M -XX:MetaspaceSize=200M "
>>>
>>> From time to time we have the same problem with TaskManagers
>>> disconnecting, but the logs are not useful. We are using 1.3.2.
>>>
>>> On 9 April 2018 at 10:41, Alexander Smirnov <
>>> alexander.smirn...@gmail.com> wrote:
>>>
 I've seen similar problem, but it was not a heap size, but Metaspace.
 It was caused by a job restarting in a loop. Looks like for each
 restart, Flink loads new instance of classes and very soon in runs out of
 metaspace.

 I've created a JIRA issue for this problem, but got no response from
 the development team on it:
 https://issues.apache.org/jira/browse/FLINK-9132
 


 On Mon, Apr 9, 2018 at 11:36 AM 王凯  wrote:

> thanks a lot,i will try it
>
> 在 2018-04-09 00:06:02,"TechnoMage"  写道:
>
> I have seen this when my task manager ran out of RAM.  Increase the
> heap size.
>
> flink-conf.yaml:
> taskmanager.heap.mb
> jobmanager.heap.mb
>
> Michael
>
> On Apr 8, 2018, at 2:36 AM, 王凯  wrote:
>
> 
> hi all, recently, i found a problem,it runs well when start. But
> after long run,the exception display as above,how can resolve it?
>
>
>
>
>
>
>
>
>

>>>
>>
>
>


Re: Temporary failure in name resolution

2018-04-03 Thread Hao Sun
Hi Timo, we do have similar issue, TM got killed by a job. Is there a way
to monitor JVM status? If through the monitor metrics, what metric I should
look after?
We are running Flink on K8S. Is there a possibility that a job consumes too
much network bandwidth, so JM and TM can not connect?

On Tue, Apr 3, 2018 at 3:11 AM Timo Walther  wrote:

> Hi Miki,
>
> for me this sounds like your job has a resource leak such that your memory
> fills up and the JVM of the TaskManager is killed at some point. How does
> your job look like? I see a WindowedStream.apply which might not be
> appropriate if you have big/frequent windows where the evaluation happens
> too late such that the state becomes too big.
>
> Regards,
> Timo
>
>
> Am 03.04.18 um 08:26 schrieb miki haiat:
>
> i tried to run flink on kubernetes and  as stand alone HA cluster and on
> both cases  task manger got lost/kill after few hours/days.
> im using ubuntu and flink 1.4.2 .
>
>
> this is part of the log , i also attaches the full log .
>
>>
>> org.tlv.esb.StreamingJob$EsbTraceEvictor@20ffca60,
>> WindowedStream.apply(WindowedStream.java:1061)) -> Sink: Unnamed (1/1)
>> (91b27853aa30be93322d9c516ec266bf) switched from RUNNING to FAILED.
>> java.lang.Exception: TaskManager was lost/killed:
>> 6dc6cd5c15588b49da39a31b6480b2e3 @ beam2 (dataPort=42587)
>> at
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>> at
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>> at
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>> at
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> 
>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at
>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 2018-04-02 13:09:01,727 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink
>> Streaming esb correlate msg (0db04ff29124f59a123d4743d89473ed) switched
>> from state RUNNING to FAILING.
>> java.lang.Exception: TaskManager was lost/killed:
>> 6dc6cd5c15588b49da39a31b6480b2e3 @ beam2 (dataPort=42587)
>> at
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>> at
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>> at
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>> at
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> 
>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36

Re: Flink and Docker ?

2018-04-03 Thread Hao Sun
Hi, we are using this docker on K8S + S3.
https://github.com/docker-flink/docker-flink

It works fine for us.

On Tue, Apr 3, 2018 at 1:00 AM Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Hi,
>
> I didn't try docker with Flink but I know that those guys did:
> https://github.com/big-data-europe/docker-flink
> 
>
> Perhaps it is worth having a look there.
>
> BR,
> Christophe
>
> 2018-04-03 9:29 GMT+02:00 Esa Heikkinen :
>
>> Hi
>>
>>
>>
>> I have noticed that Flink can be pretty tedious to install and build
>> first applications from scratch. Especially if the application is little
>> bit complex. There are also little bit different development and run time
>> environments, which require different software components with correct
>> versions.
>>
>>
>>
>> I found Docker could help with this problem:
>>
>> https://flink.apache.org/news/2017/05/16/official-docker-image.html
>> 
>>
>>
>>
>> Has anyone used Flink with Docker and what are the experiences about
>> using it ?
>>
>>
>>
>> Do you recommend to use Flink with Docker ?
>>
>>
>>
>> Can there be a problem with different versions, if some software
>> component is not correct or latest in the Docker image ?
>>
>>
>>
>> Best, Esa
>>
>>
>>
>
>


Re: How can I confirm a savepoint is used for a new job?

2018-03-26 Thread Hao Sun
Thanks Tim.

On Mon, Mar 26, 2018, 03:04 Timo Walther  wrote:

> Hi Hao,
>
> I quickly checked that manually. There should be a message similar to
> the one below in the JobManager log:
>
> INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Starting job from savepoint ...
>
> Regards,
> Timo
>
> Am 22.03.18 um 06:45 schrieb Hao Sun:
> > Do we have any logs in JM/TM indicate the job is using a savepoint I
> > passed in when I submit the job?
> >
> > Thanks
>
>
>


How can I confirm a savepoint is used for a new job?

2018-03-21 Thread Hao Sun
Do we have any logs in JM/TM indicate the job is using a savepoint I passed
in when I submit the job?

Thanks


Re: Can not cancel with savepoint with Flink 1.3.2

2018-03-15 Thread Hao Sun
Is this related?

2018-03-16 03:43:42,557 INFO  akka.actor.EmptyLocalActorRef
 - Message
[org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricSerializationResult]
from 
Actor[akka.tcp://flink@fps-flink-taskmanager-120318156-9sw8l:43048/user/MetricQueryService_dff29d22e5adee13e761c957283d30ce#1096154549]
to Actor[akka://flink/temp/$mc] was not delivered. [156] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2018-03-16 03:43:45,460 INFO  akka.actor.EmptyLocalActorRef
 - Message
[org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricSerializationResult]
from 
Actor[akka.tcp://flink@fps-flink-taskmanager-120318156-2vvkr:36163/user/MetricQueryService_eb07cde4f1affbbe48023c1a40516c1c#1874500433]
to Actor[akka://flink/temp/$nc] was not delivered. [157] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2018-03-16 03:43:50,412 INFO  akka.actor.EmptyLocalActorRef
 - Message
[org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricSerializationResult]
from 
Actor[akka.tcp://flink@fps-flink-taskmanager-120318156-2vvkr:36163/user/MetricQueryService_eb07cde4f1affbbe48023c1a40516c1c#1874500433]
to Actor[akka://flink/temp/$pc] was not delivered. [158] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2018-03-16 03:44:05,430 INFO  akka.actor.EmptyLocalActorRef
 - Message
[org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricSerializationResult]
from 
Actor[akka.tcp://flink@fps-flink-taskmanager-120318156-2vvkr:36163/user/MetricQueryService_eb07cde4f1affbbe48023c1a40516c1c#1874500433]
to Actor[akka://flink/temp/$sc] was not delivered. [159] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

On Thu, Mar 15, 2018 at 8:38 PM Hao Sun  wrote:

> Hi, I am running flink on K8S and store states in s3 with rocksdb backend.
>
> I used to be able to cancel and savepointing through the rest api.
> But sometimes the process never finish. No matter how many time I try.
>
> Is there a way to figure out what is going wrong?
> Why "isStoppable"=>false?
>
> Thanks
>
> ==
> [cancel_with_savepoint] progress: {"status"=>"in-progress",
> "request-id"=>1}, job_id: 1392811585ca8cda779511008bce3046
> ==
> [cancel_with_savepoint] job_status:
> {"jid"=>"1392811585ca8cda779511008bce3046", "name"=>"KafkaDemo
> maxwell.accounts (env:staging)", "isStoppable"=>false, "state"=>"RUNNING",
> "start-time"=>1521169404370, "end-time"=>-1, "duration"=>1559274,
> "now"=>1521170963644, "timestamps"=>{"CREATED"=>1521169404370,
> "RUNNING"=>1521169404506, "FAILING"=>0, "FAILED"=>0, "CANCELLING"=>0,
> "CANCELED"=>0, "FINISHED"=>0, "RESTARTING"=>1521168804370, "SUSPENDED"=>0,
> "RECONCILING"=>0}, "vertices"=>[{"id"=>"2f4bc854a18755730e14a90e1d4d7c7d",
> "name"=>"Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts)", "parallelism"=>1, "status"=>"RUNNING",
> "start-time"=>1521169404506, "end-time"=>-1, "duration"=>1559138,
> "tasks"=>{"CREATED"=>0, "SCHEDULED"=>0, "DEPLOYING"=>0, "RUNNING"=>1,
> "FINISHED"=>0, "CANCELING"=>0, "CANCELED"=>0, "FAILED"=>0,
> "RECONCILING"=>0}, "metrics"=>{"read-bytes"=>0, "write-bytes"=>0,
> "read-records"=>0, "write-records"=>0}}], "status-counts"=>{"CREATED"=>0,
> "SCHEDULED"=>0, "DEPLOYING"=>0, "RUNNING"=>1, "FINISHED"=>0,
> "CANCELING"=>0, "CANCELED"=>0, "FAILED"=>0, "RECONCILING"=>0},
> "plan"=>{"jid"=>"1392811585ca8cda779511008bce3046", "name"=>"KafkaDemo
> maxwell.accounts (env:staging)",
> "nodes"=>[{"id"=>"2f4bc854a18755730e14a90e1d4d7c7d", "parallelism"=>1,
> "operator"=>"", "operator_strategy"=>"", "description"=>"Source:
> KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts)", "optimizer_properties"=>{}}]}}, job_id:
> 1392811585ca8cda779511008bce3046
>


Can not cancel with savepoint with Flink 1.3.2

2018-03-15 Thread Hao Sun
Hi, I am running flink on K8S and store states in s3 with rocksdb backend.

I used to be able to cancel and savepointing through the rest api.
But sometimes the process never finish. No matter how many time I try.

Is there a way to figure out what is going wrong?
Why "isStoppable"=>false?

Thanks

==
[cancel_with_savepoint] progress: {"status"=>"in-progress",
"request-id"=>1}, job_id: 1392811585ca8cda779511008bce3046
==
[cancel_with_savepoint] job_status:
{"jid"=>"1392811585ca8cda779511008bce3046", "name"=>"KafkaDemo
maxwell.accounts (env:staging)", "isStoppable"=>false, "state"=>"RUNNING",
"start-time"=>1521169404370, "end-time"=>-1, "duration"=>1559274,
"now"=>1521170963644, "timestamps"=>{"CREATED"=>1521169404370,
"RUNNING"=>1521169404506, "FAILING"=>0, "FAILED"=>0, "CANCELLING"=>0,
"CANCELED"=>0, "FINISHED"=>0, "RESTARTING"=>1521168804370, "SUSPENDED"=>0,
"RECONCILING"=>0}, "vertices"=>[{"id"=>"2f4bc854a18755730e14a90e1d4d7c7d",
"name"=>"Source: KafkaSource(maxwell.accounts) ->
MaxwellFilter->Maxwell(maxwell.accounts) ->
FixedDelayWatermark(maxwell.accounts) ->
MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
influxdbSink(maxwell.accounts)", "parallelism"=>1, "status"=>"RUNNING",
"start-time"=>1521169404506, "end-time"=>-1, "duration"=>1559138,
"tasks"=>{"CREATED"=>0, "SCHEDULED"=>0, "DEPLOYING"=>0, "RUNNING"=>1,
"FINISHED"=>0, "CANCELING"=>0, "CANCELED"=>0, "FAILED"=>0,
"RECONCILING"=>0}, "metrics"=>{"read-bytes"=>0, "write-bytes"=>0,
"read-records"=>0, "write-records"=>0}}], "status-counts"=>{"CREATED"=>0,
"SCHEDULED"=>0, "DEPLOYING"=>0, "RUNNING"=>1, "FINISHED"=>0,
"CANCELING"=>0, "CANCELED"=>0, "FAILED"=>0, "RECONCILING"=>0},
"plan"=>{"jid"=>"1392811585ca8cda779511008bce3046", "name"=>"KafkaDemo
maxwell.accounts (env:staging)",
"nodes"=>[{"id"=>"2f4bc854a18755730e14a90e1d4d7c7d", "parallelism"=>1,
"operator"=>"", "operator_strategy"=>"", "description"=>"Source:
KafkaSource(maxwell.accounts) ->
MaxwellFilter->Maxwell(maxwell.accounts) ->
FixedDelayWatermark(maxwell.accounts) ->
MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
influxdbSink(maxwell.accounts)", "optimizer_properties"=>{}}]}}, job_id:
1392811585ca8cda779511008bce3046


Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-15 Thread Hao Sun
This is great!

On Thu, Feb 15, 2018 at 2:50 PM Bowen Li  wrote:

> Congratulations everyone!
>
> On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li (Gordon) Tai  > wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.4.1, which is the first bugfix release for the Apache Flink
>> 1.4 series.
>>
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>> 
>>
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>>
>> https://flink.apache.org/news/2018/02/15/release-1.4.1.html
>> 
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12342212
>> 
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>> Cheers,
>>
>> Gordon
>>
>>
>


Re: state.checkpoints.dir

2018-01-22 Thread Hao Sun
We generate flink.conf on the fly, so we can use different values based on
environment.

On Mon, Jan 22, 2018 at 12:53 PM Biswajit Das  wrote:

> Hello ,
>
>  Is there any hack to supply *state.checkpoints.*dir as argument or JVM
> parameter when running locally .  I can change the source
> *CheckpointCoordinator* and make it work , trying to find if there is any
> shortcuts ??
>
> Thank you
> ~ Biswajit
>


akka.remote.ShutDownAssociation: Shut down address: akka.tcp://flink@fps-flink-jobmanager:45652

2018-01-07 Thread Hao Sun
I am running Flink 1.3.2 in my local docker environment.

I see this error, not sure how to find the root cause.
I am confused by this error message, why JM is trying to connect to JM from
one random port to the RPC port: 6123?


2018-01-08 05:38:03,294 ERROR akka.remote.EndpointWriter - AssociationError
[akka.tcp://flink@fps-flink-jobmanager:6123] <-
[akka.tcp://flink@fps-flink-jobmanager:45652]: Error [Shut down address:
akka.tcp://flink@fps-flink-jobmanager:45652] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://flink@fps-flink-jobmanager:45652
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]


Here you can find the full log
https://gist.github.com/zenhao/c8d13cce8601e321dd706a2ac53f5032


Re: scala 2.12 support/cross-compile

2018-01-03 Thread Hao Sun
Thanks Stephan and Alhoscha for the info!

On Wed, Jan 3, 2018 at 2:41 AM Aljoscha Krettek  wrote:

> Hi,
>
> This is the umbrella issue for Scala 2.12 support. As Stephan pointed out,
> the ClosureCleaner and SAMs are currently the main problems. The first is
> also a problem for Spark, which track their respective progress here:
> https://issues.apache.org/jira/browse/SPARK-14540
> <https://issues.apache.org/jira/browse/SPARK-14540>.
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 10:39, Stephan Ewen  wrote:
>
> Hi Hao Sun!
>
> This is work in progress, but Scala 2.12 is a bit tricky. I think the
> Scala folks have messed this version up a bit, to be honest.
>
> The main blockers is that Scala 2.12 breaks some classes through its
> addition of SAM interface lambdas (similar to Java). Many of the DataStream
> API classes have two method variants (one with a Scala Function, one with a
> Java SAM interface) which now become ambiguously overloaded methods in
> Scala 2.12.
>
> In addition, Scala 2.12 also needs a different closure cleaner, because
> Scala 2.12 compiles differently.
>
> I am adding Aljoscha, who has started working on this...
>
> Best,
> Stephan
>
>
> On Wed, Jan 3, 2018 at 4:13 AM, Hao Sun  wrote:
>
>> Hi team, I am wondering if there is a schedule to support scala 2.12?
>> If I need flink 1.3+ with scala 2.12, do I just have to cross compile
>> myself? Is there anything blocking us from using scala 2.12?
>>
>> Thanks
>>
>
>
>


scala 2.12 support/cross-compile

2018-01-02 Thread Hao Sun
Hi team, I am wondering if there is a schedule to support scala 2.12?
If I need flink 1.3+ with scala 2.12, do I just have to cross compile
myself? Is there anything blocking us from using scala 2.12?

Thanks


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Hao Sun
Ok, thanks for the clarification.

On Thu, Dec 28, 2017 at 1:05 AM Ufuk Celebi  wrote:

> On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun  wrote:
> > Thanks! Great to know I do not have to worry duplicates inside Flink.
> >
> > One more question, why this happens? Because TM and JM both check
> leadership
> > in different interval?
>
> Yes, it's not deterministic how this happens. There will also be cases
> when the JM notices before the TM.
>
>


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Hao Sun
Thanks! Great to know I do not have to worry duplicates inside Flink.

One more question, why this happens? Because TM and JM both check
leadership in different interval?
> The TM noticed the loss of leadership before the JM did.

On Wed, Dec 27, 2017, 13:52 Ufuk Celebi  wrote:

> On Wed, Dec 27, 2017 at 4:41 PM, Hao Sun  wrote:
>
>> Somehow TM detected JM leadership loss from ZK and self disconnected?
>> And couple of seconds later, JM failed to connect to ZK?
>>
>
> Yes, exactly as you describe. The TM noticed the loss of leadership before
> the JM did.
>
>
>> After all the cluster recovered nicely by its own, but I am wondering
>> does this break the exactly-once semantics? If yes, what should I take care?
>>
>
> Great :-) It does not break exactly-once guarantees *within* the Flink
> pipeline as the state of the latest completed checkpoint will be restored
> after recovery. This rewinds your job and might result in duplicate or
> changed output if you don't use an exactly once or idempotent sink.
>
> – Ufuk
>
>


org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-27 Thread Hao Sun
Hi I need some help to figure out the root cause of this error.
I am running flink 1.3.2 on K8S.

My cluster has been up and running for almost two weeks and all of a sudden
I see this familiar error again, my task manager is killed/lost. There are
many ways cause this error, I need help to figure out what is the root
cause this time.

>From JM.log

*2017-12-26 14:57:08,624* INFO org.apache.zookeeper.ClientCnxn - Client
session timed out, have not heard from server in 85001ms for sessionid
0x25ddcdec0ef77af, closing socket connection and attempting reconnect
2017-12-26 14:57:23,621 WARN akka.remote.RemoteWatcher - Detected
unreachable: [akka.tcp://flink@fps-flink-taskmanager-960711320-vx0hj:39249]
2017-12-26 14:57:23,623 INFO org.apache.flink.runtime.jobmanager.JobManager
- Task manager 
akka.tcp://flink@fps-flink-taskmanager-960711320-vx0hj:39249/user/taskmanager
terminated.
2017-12-26 14:57:23,624 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
KafkaSource(maxwell.users) -> MaxwellFilter->Maxwell(maxwell.users) ->
FixedDelayWatermark(maxwell.users) ->
MaxwellFPSEvent->InfluxDBData(maxwell.users) -> Sink:
influxdbSink(maxwell.users) (1/1) (ddca953ae90906daaae08791e1fde729)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
e14186b8fd22699210273f887570a172 @ fps-flink-taskmanager-960711320-vx0hj
(dataPort=37353)


>From TM.log
*2017-12-26 14:56:26,019 INFO* org.apache.flink.runtime.taskmanager.Task -
Source: KafkaSource(maxwell.tickets) ->
MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/1) (44b4caf2010bb2b061b67d4f6c8dbc3f)
switched from RUNNING to FAILED.java.lang.Exception: TaskManager
akka://flink/user/taskmanager disconnects from JobManager
akka.tcp://flink@fps-flink-jobmanager:6123/user/jobmanager: *Old JobManager
lost its leadership.*

Somehow TM detected JM leadership loss from ZK and self disconnected?
And couple of seconds later, JM failed to connect to ZK?

After all the cluster recovered nicely by its own, but I am wondering does
this break the exactly-once semantics? If yes, what should I take care?

Thanks team!

Full log:
https://gist.github.com/zenhao/e2f9b929f4eaee32f99948d462db7359


Could not flush and close the file system output stream to s3a, is this fixed?

2017-12-12 Thread Hao Sun
https://issues.apache.org/jira/browse/FLINK-7590

I have a similar situation with Flink 1.3.2 on K8S

=
2017-12-13 00:57:12,403 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3) (6ad009755a6009975d197e75afa05e14)
switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
Could not materialize checkpoint 803 for operator Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3).} at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
Could not materialize checkpoint 803 for operator Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
java.util.concurrent.ExecutionException: java.io.IOException: Could not
flush and close the file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Suppressed: java.lang.Exception: Could not properly cancel
managed operator state future. at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
... 7 more Caused by: java.io.IOException: Could not flush and close the
file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A,
AWS Error Code: RequestTimeout, AWS Error Message: Your socket connection
to the server was not read from or written to within the timeout period.
Idle connections will be closed., S3 Extended Request ID:
dADBPVGflB29xtFb7ydxD2SU3LzHw2cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc=
at
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at
com.amazonaws.http.AmazonHttpClient

Re: [ANNOUNCE] Apache Flink 1.4.0 released

2017-12-12 Thread Hao Sun
Congratulations! Awesome work.
Two quick questions about the HDFS free feature.
I am using S3 to store checkpoints, savepoints, and I know it is being done
through hadoop-aws.

- Do I have to include a hadoop-aws jar in my flatjar AND flink's lib
directory to make it work for 1.4? Both or just the lib directory?
- Am I free to choose the latest version of hadoop-aws?

On Tue, Dec 12, 2017 at 4:43 AM Flavio Pompermaier 
wrote:

> Thanks Aljoscha! Just one question: is there any upgrade guideline?
> Or is the upgrade from 1.3.1 to 1.4 almost frictionless?
>
> On Tue, Dec 12, 2017 at 1:39 PM, Fabian Hueske  wrote:
>
>> Thank you Aljoscha for managing the release!
>>
>> 2017-12-12 12:46 GMT+01:00 Aljoscha Krettek :
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.4.0.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>>
>>> The release is available for download at:
>>>
>>>https://flink.apache.org/downloads.html
>>> 
>>>
>>> Please check out the release blog post for an overview of the new
>>> features and improvements and the list of contributors:
>>>
>>>   https://flink.apache.org/news/2017/12/12/release-1.4.0.html
>>> 
>>>
>>> The full release notes are available in Jira:
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12340533
>>> 
>>>
>>>
>>> I would like to thank all contributors for working very hard on making
>>> this release a success!
>>>
>>> Best,
>>> Aljoscha
>>
>>
>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809 <+39%200461%20041809>
>


Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
Let me check details, on top of my mind I remember the job id changes, I
might be wrong.

On Thu, Dec 7, 2017, 08:48 Fabian Hueske  wrote:

> AFAIK, a job keeps its ID in case of a recovery.
> Did you observe something else?
>
> 2017-12-07 17:32 GMT+01:00 Hao Sun :
>
>> I mean restarted during failure recovery
>>
>> On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske  wrote:
>>
>>> What do you mean by rescheduled?
>>> Started from a savepoint or restarted during failure recovery?
>>>
>>>
>>> 2017-12-07 16:59 GMT+01:00 Hao Sun :
>>>
>>>> Anything I can do for the job reschedule case? Thanks.
>>>> Or is there a way to add job lifecycle hooks to trace it?
>>>>
>>>> On Mon, Dec 4, 2017 at 12:01 PM Hao Sun  wrote:
>>>>
>>>>> Thanks Fabian, there is one case can not be covered by the REST API.
>>>>> When a job rescheduled to run, but jobid will change, and I wont be able 
>>>>> to
>>>>> backtrace the jar name. Why not keep the jar name stored somewhere and
>>>>> expose it through the api as well?
>>>>>
>>>>> On Mon, Dec 4, 2017 at 4:52 AM Fabian Hueske 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> you can submit jar files and start jobs via the REST interface [1].
>>>>>> When starting a job, you get the jobId. You can link jar files and
>>>>>> savepoints via the jobId.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs>
>>>>>>
>>>>>> 2017-12-02 0:28 GMT+01:00 Hao Sun :
>>>>>>
>>>>>>> Hi I am using Flink 1.3.2 on K8S, and need a deployment strategy for
>>>>>>> my app.
>>>>>>>
>>>>>>> I want to use savepoints to resume a job after each deployment.
>>>>>>> As you know I need jar file name and path to savepoints to resume a
>>>>>>> task.
>>>>>>>
>>>>>>> Currently `flink list` command only gives me job ids, not jar file
>>>>>>> names.
>>>>>>> And REST API does not have that information as well. If I have
>>>>>>> multiple jar files how can I map the savepoints back to jars, so I can
>>>>>>> resume my task?
>>>>>>>
>>>>>>> I thought about save the jar to jid map somewhere, but Flink can
>>>>>>> reschedule a task on failures, so the map will be stale.
>>>>>>>
>>>>>>> Any thoughts is appreciated. Many thanks.
>>>>>>>
>>>>>>>
>>>>>>
>>>
>


Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
I mean restarted during failure recovery

On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske  wrote:

> What do you mean by rescheduled?
> Started from a savepoint or restarted during failure recovery?
>
>
> 2017-12-07 16:59 GMT+01:00 Hao Sun :
>
>> Anything I can do for the job reschedule case? Thanks.
>> Or is there a way to add job lifecycle hooks to trace it?
>>
>> On Mon, Dec 4, 2017 at 12:01 PM Hao Sun  wrote:
>>
>>> Thanks Fabian, there is one case can not be covered by the REST API.
>>> When a job rescheduled to run, but jobid will change, and I wont be able to
>>> backtrace the jar name. Why not keep the jar name stored somewhere and
>>> expose it through the api as well?
>>>
>>> On Mon, Dec 4, 2017 at 4:52 AM Fabian Hueske  wrote:
>>>
>>>> Hi,
>>>>
>>>> you can submit jar files and start jobs via the REST interface [1].
>>>> When starting a job, you get the jobId. You can link jar files and
>>>> savepoints via the jobId.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs>
>>>>
>>>> 2017-12-02 0:28 GMT+01:00 Hao Sun :
>>>>
>>>>> Hi I am using Flink 1.3.2 on K8S, and need a deployment strategy for
>>>>> my app.
>>>>>
>>>>> I want to use savepoints to resume a job after each deployment.
>>>>> As you know I need jar file name and path to savepoints to resume a
>>>>> task.
>>>>>
>>>>> Currently `flink list` command only gives me job ids, not jar file
>>>>> names.
>>>>> And REST API does not have that information as well. If I have
>>>>> multiple jar files how can I map the savepoints back to jars, so I can
>>>>> resume my task?
>>>>>
>>>>> I thought about save the jar to jid map somewhere, but Flink can
>>>>> reschedule a task on failures, so the map will be stale.
>>>>>
>>>>> Any thoughts is appreciated. Many thanks.
>>>>>
>>>>>
>>>>
>


Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
Anything I can do for the job reschedule case? Thanks.
Or is there a way to add job lifecycle hooks to trace it?

On Mon, Dec 4, 2017 at 12:01 PM Hao Sun  wrote:

> Thanks Fabian, there is one case can not be covered by the REST API. When
> a job rescheduled to run, but jobid will change, and I wont be able to
> backtrace the jar name. Why not keep the jar name stored somewhere and
> expose it through the api as well?
>
> On Mon, Dec 4, 2017 at 4:52 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> you can submit jar files and start jobs via the REST interface [1].
>> When starting a job, you get the jobId. You can link jar files and
>> savepoints via the jobId.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs>
>>
>> 2017-12-02 0:28 GMT+01:00 Hao Sun :
>>
>>> Hi I am using Flink 1.3.2 on K8S, and need a deployment strategy for my
>>> app.
>>>
>>> I want to use savepoints to resume a job after each deployment.
>>> As you know I need jar file name and path to savepoints to resume a task.
>>>
>>> Currently `flink list` command only gives me job ids, not jar file names.
>>> And REST API does not have that information as well. If I have multiple
>>> jar files how can I map the savepoints back to jars, so I can resume my
>>> task?
>>>
>>> I thought about save the jar to jid map somewhere, but Flink can
>>> reschedule a task on failures, so the map will be stale.
>>>
>>> Any thoughts is appreciated. Many thanks.
>>>
>>>
>>


Re: non-shared TaskManager-specific config file

2017-12-04 Thread Hao Sun
https://issues.apache.org/jira/browse/FLINK-8197, here is the JIRA link for
xref.

On Mon, Dec 4, 2017 at 7:35 AM Hao Sun  wrote:

> Sure, I will do that.
>
> On Mon, Dec 4, 2017, 07:26 Fabian Hueske  wrote:
>
>> Can you create a JIRA issue to propose the feature?
>>
>> Thank you,
>> Fabian
>>
>> 2017-12-04 16:15 GMT+01:00 Hao Sun :
>>
>>> Thanks. If we can support include configuration dir that will be very
>>> helpful.
>>>
>>> On Mon, Dec 4, 2017, 00:50 Chesnay Schepler  wrote:
>>>
>>>> You will have to create a separate config for each TaskManager.
>>>>
>>>>
>>>> On 01.12.2017 23:14, Hao Sun wrote:
>>>>
>>>> Hi team, I am wondering how can I create a non-shared config file and
>>>> let Flink read it. Can I use include in the config? Or I have to prepare a
>>>> different config for each TM?
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>>>>
>>>>
>>>>-
>>>>
>>>>taskmanager.hostname: The hostname of the network interface that
>>>>the TaskManager binds to. By default, the TaskManager searches for 
>>>> network
>>>>interfaces that can connect to the JobManager and other TaskManagers. 
>>>> This
>>>>option can be used to define a hostname if that strategy fails for some
>>>>reason. Because different TaskManagers need different values for this
>>>>option, it usually is specified in an additional non-shared
>>>>TaskManager-specific config file.
>>>>
>>>>
>>>>
>>>>
>>


Re: Trace jar file name from jobId, is that possible?

2017-12-04 Thread Hao Sun
Thanks Fabian, there is one case can not be covered by the REST API. When a
job rescheduled to run, but jobid will change, and I wont be able to
backtrace the jar name. Why not keep the jar name stored somewhere and
expose it through the api as well?

On Mon, Dec 4, 2017 at 4:52 AM Fabian Hueske  wrote:

> Hi,
>
> you can submit jar files and start jobs via the REST interface [1].
> When starting a job, you get the jobId. You can link jar files and
> savepoints via the jobId.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs>
>
> 2017-12-02 0:28 GMT+01:00 Hao Sun :
>
>> Hi I am using Flink 1.3.2 on K8S, and need a deployment strategy for my
>> app.
>>
>> I want to use savepoints to resume a job after each deployment.
>> As you know I need jar file name and path to savepoints to resume a task.
>>
>> Currently `flink list` command only gives me job ids, not jar file names.
>> And REST API does not have that information as well. If I have multiple
>> jar files how can I map the savepoints back to jars, so I can resume my
>> task?
>>
>> I thought about save the jar to jid map somewhere, but Flink can
>> reschedule a task on failures, so the map will be stale.
>>
>> Any thoughts is appreciated. Many thanks.
>>
>>
>


Re: non-shared TaskManager-specific config file

2017-12-04 Thread Hao Sun
Sure, I will do that.

On Mon, Dec 4, 2017, 07:26 Fabian Hueske  wrote:

> Can you create a JIRA issue to propose the feature?
>
> Thank you,
> Fabian
>
> 2017-12-04 16:15 GMT+01:00 Hao Sun :
>
>> Thanks. If we can support include configuration dir that will be very
>> helpful.
>>
>> On Mon, Dec 4, 2017, 00:50 Chesnay Schepler  wrote:
>>
>>> You will have to create a separate config for each TaskManager.
>>>
>>>
>>> On 01.12.2017 23:14, Hao Sun wrote:
>>>
>>> Hi team, I am wondering how can I create a non-shared config file and
>>> let Flink read it. Can I use include in the config? Or I have to prepare a
>>> different config for each TM?
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>>>
>>>
>>>-
>>>
>>>taskmanager.hostname: The hostname of the network interface that the
>>>TaskManager binds to. By default, the TaskManager searches for network
>>>interfaces that can connect to the JobManager and other TaskManagers. 
>>> This
>>>option can be used to define a hostname if that strategy fails for some
>>>reason. Because different TaskManagers need different values for this
>>>option, it usually is specified in an additional non-shared
>>>TaskManager-specific config file.
>>>
>>>
>>>
>>>
>


Re: non-shared TaskManager-specific config file

2017-12-04 Thread Hao Sun
Thanks. If we can support include configuration dir that will be very
helpful.

On Mon, Dec 4, 2017, 00:50 Chesnay Schepler  wrote:

> You will have to create a separate config for each TaskManager.
>
>
> On 01.12.2017 23:14, Hao Sun wrote:
>
> Hi team, I am wondering how can I create a non-shared config file and let
> Flink read it. Can I use include in the config? Or I have to prepare a
> different config for each TM?
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>
>
>-
>
>taskmanager.hostname: The hostname of the network interface that the
>TaskManager binds to. By default, the TaskManager searches for network
>interfaces that can connect to the JobManager and other TaskManagers. This
>option can be used to define a hostname if that strategy fails for some
>reason. Because different TaskManagers need different values for this
>option, it usually is specified in an additional non-shared
>TaskManager-specific config file.
>
>
>
>


Trace jar file name from jobId, is that possible?

2017-12-01 Thread Hao Sun
Hi I am using Flink 1.3.2 on K8S, and need a deployment strategy for my app.

I want to use savepoints to resume a job after each deployment.
As you know I need jar file name and path to savepoints to resume a task.

Currently `flink list` command only gives me job ids, not jar file names.
And REST API does not have that information as well. If I have multiple jar
files how can I map the savepoints back to jars, so I can resume my task?

I thought about save the jar to jid map somewhere, but Flink can reschedule
a task on failures, so the map will be stale.

Any thoughts is appreciated. Many thanks.


non-shared TaskManager-specific config file

2017-12-01 Thread Hao Sun
Hi team, I am wondering how can I create a non-shared config file and let
Flink read it. Can I use include in the config? Or I have to prepare a
different config for each TM?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html


   -

   taskmanager.hostname: The hostname of the network interface that the
   TaskManager binds to. By default, the TaskManager searches for network
   interfaces that can connect to the JobManager and other TaskManagers. This
   option can be used to define a hostname if that strategy fails for some
   reason. Because different TaskManagers need different values for this
   option, it usually is specified in an additional non-shared
   TaskManager-specific config file.


Re: Questions about checkpoints/savepoints

2017-11-30 Thread Hao Sun
Hi team, I am a similar use case do we have any answers on this?
When we trigger savepoint can we store that information to ZK as well?
So I can avoid S3 file listing and do not have to use other external
services?

On Wed, Oct 25, 2017 at 11:19 PM vipul singh  wrote:

> As a followup to above, is there a way to get the last checkpoint metadata
> location inside *notifyCheckpointComplete*  method? I tried poking
> around, but didnt see a way to achieve this. Or incase there is any other
> way to save the actual checkpoint metadata location information into a
> datastore(dynamodb etc)?
>
> We are looking to save the savepoint/externalized checkpoint metadata
> location in some storage space, so that we can pass this information to
> flink run command during recovery(thereby removing the possibility of any
> read after write consistency arising out of listing file paths etc).
>
> Thanks,
> Vipul
>
> On Tue, Oct 24, 2017 at 11:53 PM, vipul singh  wrote:
>
>> Thanks Aljoscha for the explanations. I was able to recover from the
>> last externalized checkpoint, by using flink run -s 
>> 
>>
>> I am curious, are there any options to save the metadata file name to
>> some other place like dynamo etc at the moment? The reason why I am asking
>> is,
>> for the end launcher code we are writing, we want to ensure if a flink
>> job crashes, we can just start it from last known externalized checkpoint.
>> In the present senario, we have to list the contents of the s3 bucket
>> which saves the metadata, to see the last metadata before failure, and
>> there might a window where
>> we might run into read after write consistency of s3. Thoughts?
>>
>> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> That distinction with externalised checkpoints is a bit of a pitfall and
>>> I'm hoping that we can actually get rid of that distinction in the next
>>> version or the version after that. With that change, all checkpoints would
>>> always be externalised, since it's not really any noticeable overhead.
>>>
>>> Regarding read-after-write consistency, you should be fine since an the
>>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>>> the file-path (either from the Flink dashboard or by looking at the S3
>>> bucket) you can restore from it.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 24. Oct 2017, at 08:22, vipul singh  wrote:
>>>
>>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>>> so much!
>>>
>>> I have one followup question. Say in above case, I terminate the
>>> cluster, and since the metadata is on s3, and not on local storage, does
>>> flink avoid read after write consistency of s3? Would it be a valid
>>> concern, or we handle that case in externalized checkpoints as well, and
>>> dont deal with file system operations while dealing with retrieving
>>> externalized checkpoints on s3.
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei 
>>> wrote:
>>>
 Hi,

 Did you enable externalized checkpoints? [1]

 Best,
 Tony Wei

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints
 

 2017-10-24 13:07 GMT+08:00 vipul singh :

> Thanks Aljoscha for the answer above.
>
> I am experimenting with savepoints and checkpoints on my end, so that
> we built fault tolerant application with exactly once semantics.
>
> I have been able to test various scenarios, but have doubts about one
> use case.
>
> My app is running on an emr cluster, and I am trying to test the case
> when a emr cluster is terminated. I have read that
> *state.checkpoints.dir *is responsible for storing metadata
> information, and links to data files in
> *state.backend.fs.checkpointdir.*
>
> For my application I have configured both
> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>
> Also I have the following in my main app:
>
> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>
> val CHECKPOINT_LOCATION = 
> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>
> val backend:RocksDBStateBackend =
>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>
> env.setStateBackend(backend)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>
>
> In the application startup logs I can see
> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
> being loaded. However when the checkpoint ha

Re: [EXTERNAL] difference between checkpoints & savepoints

2017-11-30 Thread Hao Sun
Hi team, I have one follow up question on this.

There is a discussion on resuming jobs from *a saved external checkpoint*,
I feel there are two aspects of that topic.
*1. I do not have changes to the job, just want to resume the job from a
failure.*
I can see this automatically happen with ZK enabled. I do not have to
manually do anything.
==
2017-12-01 05:02:26,603 DEBUG
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore -
Recovering job graph f824eabe58d180d79416d9637ac6aa32 from
fraud_prevention_service/flink/jobgraphs/f824eabe58d180d79416d9637ac6aa32.
==

*2. I want to submit a new job and resume the previous process for whatever
reason. e.g. JobGraph changed, need to change parallelism, etc.*
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#faq
I am wondering for Flink 1.3.2, 1.4 and 1.5, does external checkpoint
identical to savepoint? Does it mean everything in the FAQ section, also
applies to the externalized checkpoint? *How about allowNonRestoredState,
do we have things like this for externalized chkpnt?*

I am running Flink 1.3.2 on K8S, so I am wondering what is the best
practice to do the deployment for new code releases. And Flip6 is awesome,
can't wait to use it.

Thanks as always.


On Wed, Aug 16, 2017 at 5:23 PM Raja.Aravapalli 
wrote:

>
>
> Thanks very much for the detailed explanation Stefan.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *Stefan Richter 
> *Date: *Monday, August 14, 2017 at 7:47 AM
> *To: *Raja Aravapalli 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: [EXTERNAL] difference between checkpoints & savepoints
>
>
>
> Just noticed that I forgot to include also a reference to the
> documentation about externalized checkpoints:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html
> 
>
>
>
> Am 14.08.2017 um 14:17 schrieb Stefan Richter  >:
>
>
>
>
>
> Hi,
>
>
>
>
>
> Also, in the same line, can someone detail the difference between State
> Backend & External checkpoint?
>
>
>
>
>
> Those are two very different things. If we talk about state backends in
> Flink, we mean the entity that is responsible for storing and managing the
> state inside an operator. This could for example be something like the
> FsStateBackend that is based on hash maps and keeps state on the heap, or
> the RocksDBStateBackend which is using RocksDB as a store internally and
> operates on native memory and disk.
>
>
>
> An externalized checkpoint, like a normal checkpoint, is the collection of
> all state in a job persisted to stable storage for recovery. A little more
> concrete, this typically means writing out the contents of the state
> backends to a save place so that we can restore them from there.
>
>
>
> Also, programmatic API, thru which methods we can configure those.
>
>
>
> This explains how to set the backend programatically:
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html
> 
>
>
>
> To activate externalized checkpoints, you activate normal checkpoints,
> plus the following line:
>
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.*RETAIN_ON_CANCELLATION*);
>
>
>
> where env is your StreamExecutionEnvironment.
>
>
>
> If you need an example, please take a look at the
> org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase. This
> class configures everything you asked about programatically.
>
>
>
> Best,
>
> Stefan
>
>
>
>
>


Task manager suddenly lost connection to JM

2017-11-16 Thread Hao Sun
Hi team, I see an wired issue that one of my TM suddenly lost connection to
JM.
Once the job running on the TM relocated to a new TM, it can reconnect to
JM again.
And after a while, the new TM running the same job will repeat the same
process.
It is not guaranteed the troubled TMs can reconnect to JM in a reasonable
time frame, like minutes. Sometime it take days in order to reconnect
successfully.

I am using Flink 1.3.2 and Kubernetes. Is this because of network
congestion?

Thanks!

= Logs from JM ==

*2017-11-16 19:14:40,216 WARN  akka.remote.RemoteWatcher*
   - Detected unreachable:
[akka.tcp://flink@fps-flink-taskmanager-701246518-rb4k6:46416]
2017-11-16 19:14:40,218 INFO
org.apache.flink.runtime.jobmanager.JobManager- Task
manager 
akka.tcp://flink@fps-flink-taskmanager-701246518-rb4k6:46416/user/taskmanager
terminated.
2017-11-16 19:14:40,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: KafkaSource(maxwell.tickets) ->
MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) (1/1)
(484ebabbd13dce5e8503d88005bcdb6c) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
50cae001c1d97e55889a6051319f4746 @
fps-flink-taskmanager-701246518-rb4k6 (dataPort=43871)
at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*2017-11-16
19:14:40,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
KafkaDemo (env:production) (3de8f35a1af689237e3c5c94023aba3f) switched
from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
50cae001c1d97e55889a6051319f4746 @
fps-flink-taskmanager-701246518-rb4k6 (dataPort=43871)*
at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.s

Re: org.apache.flink.runtime.io.network.NetworkEnvironment causing memory leak?

2017-11-16 Thread Hao Sun
Sorry, the "killed" I mean here is JM lost the TM. The TM instance is still
running inside kubernetes, but it is not responding to any requests,
probably due to high load. And from JM side, JM lost heartbeat tracking of
the TM, so it marked the TM as died.

The „volume“ of Kafka topics, I mean, the volume of messages for a topic.
e.g. 1 msg/sec, I have not check the size of the message yet.
But overall, as you suggested, I think I need more tuning for my TM params,
so it can maintain a reasonable load. I am not sure what params to look
for, but I will do my research first.

Always thanks for your help Stefan.

On Thu, Nov 16, 2017 at 8:27 AM Stefan Richter 
wrote:

> Hi,
>
> In addition to your comments, what are the items retained by
> NetworkEnvironment? They grew seems like indefinitely, do they ever reduce?
>
>
> Mostly the network buffers, which should be ok. They are always recycled
> and should not be released until the network environment is GCed.
>
> I think there is a GC issue because my task manager is killed somehow
> after a job run. The duration correlates to the volume of Kafka topics.
> More volume TM dies quickly. Do you have any tips to debug it?
>
> What killed your task manager? For example do you see a see an
> java.lang.OutOfMemoryError or is the process killed by the OS’s OOM killer?
> In case of an OOM killer, you might need to grant more process memory or
> reduce the memory that you have configured for Flink to stay below the
> configured threshold that would kill the process. What exactly do you mean
> by „volume“ of Kafka topics?
>
> To debug, I suggest that you first figure out why the process is killed,
> maybe your thresholds are simply to low and the consumption can go beyond
> with your configuration of Flink. Then you should figure out what is
> actually growing more than you expect, e.g. is the problem triggered by
> heap space or native memory? Depending on the answer, e.g. heap dumps could
> help to spot the problematic objects.
>
> Best,
> Stefan
>


Re: org.apache.flink.runtime.io.network.NetworkEnvironment causing memory leak?

2017-11-16 Thread Hao Sun
Thanks a lot! This is very helpful.
In addition to your comments, what are the items retained by
NetworkEnvironment? They grew seems like indefinitely, do they ever reduce?

I think there is a GC issue because my task manager is killed somehow after
a job run. The duration correlates to the volume of Kafka topics. More
volume TM dies quickly. Do you have any tips to debug it?

On Thu, Nov 16, 2017, 01:35 Stefan Richter 
wrote:

> Hi,
>
> I cannot spot anything that indicates a leak from your screenshots. Maybe
> you misinterpret the numbers? In your heap dump, there is only a single
> instance of org.apache.flink.runtime.io.network.NetworkEnvironment and it
> retains about 400,000,000 bytes from being GCed because it holds references
> to the network buffers. This is perfectly normal because this the buffer
> pool is part of this object, and for as long as it lives, the referenced
> buffers should not be GCed and the current size of all your buffers is
> around 400 million bytes.
>
> Your heap space is also not growing without bounds, but always goes down
> after a GC was performed. Looks fine to me.
>
> Last, I think the number of G1_Young_Generation is a counter of how many
> gc cycles have been performed and the time is a sum. So naturally, those
> values would always increase.
>
> Best,
> Stefan
>
> > Am 15.11.2017 um 18:35 schrieb Hao Sun :
> >
> > Hi team, I am looking at some memory/GC issues for my flink setup. I am
> running flink 1.3.2 in docker for my development environment. Using
> Kubernetes for production.
> > I see instances of org.apache.flink.runtime.io.network.NetworkEnvironment
> are increasing dramatically and not GC-ed very well for my application.
> > My simple app collects Kafka events and transforms the information and
> logs the results out.
> >
> > Is this expected? I am new to Java memory analysis not sure what is
> actually wrong.
> >
> > 
> > 
> > 
> > 
>
>


Re: R/W traffic estimation between Flink and Zookeeper

2017-11-16 Thread Hao Sun
Great, thanks for the info, Stefan.

On Thu, Nov 16, 2017, 01:59 Stefan Richter 
wrote:

> Hi,
>
> I think Zookeeper is only used as a meta data store in HA mode.
> Interactions with ZK are not part of the per-record stream processing code
> paths of Flink. Things that are written to ZK can (also depending on your
> job) include e.g. the job graph, Kafka offsets, or the meta data about
> available checkpoints to recover from. Some of those interactions happen
> only once per job, others happen periodically. In the big picture,
> interactions with ZK happen rather rarely, but of course this also depends
> on configuration parameters like your checkpointing interval. For a typical
> job, I would estimate that ZK interactions occur less than once per second.
> As for typical message sizes, if would estimate something between a few
> bytes or kilobytes for most messages and somewhere in the low two-digit
> megabytes as a typical max size.
>
> Best,
> Stefan
>
> Am 15.11.2017 um 18:41 schrieb Hao Sun :
>
> Thanks Piotr, does Flink read/write to zookeeper every time it process a
> record?
> I thought only JM uses ZK to keep some meta level data, not sure why `it
> depends on many things like state backend used, state size, complexity of
> your application, size of the records, number of machines, their hardware
> and the network.`
>
> On Thu, Oct 12, 2017 at 1:35 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Are you asking how to measure records/s or is it possible to achieve it?
>> To measure it you can check numRecordsInPerSecond metric.
>>
>> As far if 1000 records/s is possible, it depends on many things like
>> state backend used, state size, complexity of your application, size of the
>> records, number of machines, their hardware and the network. In the very
>> simplest cases it is possible to achieve millions of records per second per
>> machine. It would be best to try it out in your particular use case on some
>> small scale.
>>
>> Piotrek
>>
>> > On 11 Oct 2017, at 19:58, Hao Sun  wrote:
>> >
>> > Hi Is there a way to estimate read/write traffic between flink and zk?
>> > I am looking for something like 1000 reads/sec or 1000 writes/sec. And
>> the size of the message.
>> >
>> > Thanks
>>
>>
>


Re: R/W traffic estimation between Flink and Zookeeper

2017-11-15 Thread Hao Sun
Thanks Piotr, does Flink read/write to zookeeper every time it process a
record?
I thought only JM uses ZK to keep some meta level data, not sure why `it
depends on many things like state backend used, state size, complexity of
your application, size of the records, number of machines, their hardware
and the network.`

On Thu, Oct 12, 2017 at 1:35 AM Piotr Nowojski 
wrote:

> Hi,
>
> Are you asking how to measure records/s or is it possible to achieve it?
> To measure it you can check numRecordsInPerSecond metric.
>
> As far if 1000 records/s is possible, it depends on many things like state
> backend used, state size, complexity of your application, size of the
> records, number of machines, their hardware and the network. In the very
> simplest cases it is possible to achieve millions of records per second per
> machine. It would be best to try it out in your particular use case on some
> small scale.
>
> Piotrek
>
> > On 11 Oct 2017, at 19:58, Hao Sun  wrote:
> >
> > Hi Is there a way to estimate read/write traffic between flink and zk?
> > I am looking for something like 1000 reads/sec or 1000 writes/sec. And
> the size of the message.
> >
> > Thanks
>
>


R/W traffic estimation between Flink and Zookeeper

2017-10-11 Thread Hao Sun
Hi Is there a way to estimate read/write traffic between flink and zk?
I am looking for something like 1000 reads/sec or 1000 writes/sec. And the
size of the message.

Thanks


Re: How to make my execution graph prettier?

2017-10-10 Thread Hao Sun
Great, thanks!

On Tue, Oct 10, 2017 at 7:52 AM Aljoscha Krettek 
wrote:

> Hi,
>
> The execution graph looks like this because Flink optimises your graph to
> fit all operations within a single Task. This operation is called chaining.
> The operation can be applied when there is no shuffle between operations
> and when the parallelism is the same (roughly speaking).
>
> If you wan't the graph to have separate tasks, you can disable chaining on
> the Flink ExecutionConfig. This can lead to worse performance, though.
>
> Best,
> Aljoscha
>
> On 10. Oct 2017, at 06:36, Hao Sun  wrote:
>
> Hi my execution graph looks like following, all things stuffed into on
> tile.
>
> How can I get something like this?
>
>
>


How to make my execution graph prettier?

2017-10-09 Thread Hao Sun
Hi my execution graph looks like following, all things stuffed into on
tile.[image:
image.png]
How can I get something like this?


TM get killed/disconnected after a while

2017-10-06 Thread Hao Sun
Hi, I am running Flink 1.3.2 on kubernetes, I am not sure why sometime one
of my TM is killed, is there a way to debug this? Thanks

= Logs 

*2017-10-05 22:36:42,631 INFO
org.apache.flink.runtime.instance.InstanceManager - Registered
TaskManager at fps-flink-taskmanager-2384273947-9n4kc
(akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274/user/taskmanager)
as 330ff7eeaabfe2b7289fee4a0e36c4b2. Current number of registered hosts is
2. Current number of alive task slots is 2.*
2017-10-05 22:37:04,974 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
Source: KafkaSource(maxwell.users) -> MaxwellFilter->Maxwell(maxwell.users)
-> FixedDelayWatermark(maxwell.users) ->
MaxwellFPSEvent->InfluxDBData(maxwell.users) -> (Sink:
influxdbSink(maxwell.users), Sink: PrintSink(maxwell.users)) (1/1) (attempt
#0) to fps-flink-taskmanager-2384273947-9n4kc
*2017-10-06 06:08:55,657 WARN  akka.remote.ReliableDeliverySupervisor
  - Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]*
2017-10-06 06:08:55,832 WARN  Remoting
- Tried to associate with unreachable remote address
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]. Address is
now gated for 5000 ms, all messages to this address will be delivered to
dead letters. Reason: [The remote system has quarantined this system. No
further associations to the remote system are possible until this system is
restarted.]
2017-10-06 06:09:01,232 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc: Name does not resolve]
2017-10-06 06:09:03,416 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc]
2017-10-06 06:09:11,174 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc]
2017-10-06 06:09:11,440 WARN  Remoting
- Tried to associate with unreachable remote address
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]. Address is
now gated for 5000 ms, all messages to this address will be delivered to
dead letters. Reason: [The remote system has quarantined this system. No
further associations to the remote system are possible until this system is
restarted.]
2017-10-06 06:09:21,232 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc: Name does not resolve]
2017-10-06 06:09:27,460 WARN  Remoting
- Tried to associate with unreachable remote address
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]. Address is
now gated for 5000 ms, all messages to this address will be delivered to
dead letters. Reason: [The remote system has quarantined this system. No
further associations to the remote system are possible until this system is
restarted.]
2017-10-06 06:09:31,173 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc]
2017-10-06 06:09:41,179 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274]] Caused by:
[fps-flink-taskmanager-2384273947-9n4kc: Name does not resolve]
2017-10-06 06:09:51,174 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://flink@fps-flink-taskmanager-2384273947-9n4kc:40274] has failed,
address is now gated for [5000] ms. R

Re: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-04 Thread Hao Sun
Here is what my docker file says:

ENV FLINK_VERSION=1.3.2 \
HADOOP_VERSION=27 \
SCALA_VERSION=2.11 \


On Wed, Oct 4, 2017 at 8:23 AM Hao Sun  wrote:

> I am running Flink 1.3.2 with docker on kubernetes. My docker is using
> openjdk-8, I do not have hadoop, the version is 2.7, scala is 2.11. Thanks!
>
> FROM openjdk:8-jre-alpine
>
>
> On Wed, Oct 4, 2017 at 8:11 AM Chesnay Schepler 
> wrote:
>
>> I've found a few threads where an outdated jdk version on the
>> server/client may be the cause.
>>
>> Which Flink binary (specifically, for which hadoop version) are you using?
>>
>>
>> On 03.10.2017 20:48, Hao Sun wrote:
>>
>> com.amazonaws.http.AmazonHttpClient   - Unable to 
>> execute HTTP request: peer not authenticated
>> javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
>>  at 
>> sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:431)
>>
>>
>>


Re: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-04 Thread Hao Sun
I am running Flink 1.3.2 with docker on kubernetes. My docker is using
openjdk-8, I do not have hadoop, the version is 2.7, scala is 2.11. Thanks!

FROM openjdk:8-jre-alpine


On Wed, Oct 4, 2017 at 8:11 AM Chesnay Schepler  wrote:

> I've found a few threads where an outdated jdk version on the
> server/client may be the cause.
>
> Which Flink binary (specifically, for which hadoop version) are you using?
>
>
> On 03.10.2017 20:48, Hao Sun wrote:
>
> com.amazonaws.http.AmazonHttpClient   - Unable to 
> execute HTTP request: peer not authenticated
> javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
>   at 
> sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:431)
>
>
>


javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-03 Thread Hao Sun
I am using S3 for checkpointing and external ckp as well.

s3a://bucket/checkpoints/e58d369f5a181842768610b5ab6a500b


I have this exception, and not sure what I can do with it.
I guess to configure hadoop to use some SSLFactory?

I am not using hadoop, I am on kubernetes (in AWS) with S3


Thanks!

= Logs =

2017-10-03 17:52:27,452 INFO  com.amazonaws.http.AmazonHttpClient
 - Unable to execute HTTP request: The target
server failed to respond
org.apache.http.NoHttpResponseException: The target server failed to respond
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
at 
org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:191)
at 
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
at 
com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
at 
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
at 
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
at 
org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
at 
org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:859)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:228)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:203)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1294)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:323)
at 
org.apache.flink.runtime.state.JavaSerializer.serialize(JavaSerializer.java:70)
at 
org.apache.flink.runtime.state.JavaSerializer.serialize(JavaSerializer.java:33)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.write(DefaultOperatorStateBackend.java:463)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:263)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at 
org.apache.flink.runtime.i

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

2017-09-26 Thread Hao Sun
Thanks, I will try that.

On Tue, Sep 26, 2017 at 8:24 AM Aljoscha Krettek 
wrote:

> I'm not sure whether the JM is reading it or not. But you can manually set
> the values on the Configuration using the setter methods.
>
>
> On 26. Sep 2017, at 16:58, Hao Sun  wrote:
>
> Thanks Aljoscha, I still have questions.
> Do I have to parse the yaml to a Configuration file? If JM is not reading
> the config how is reading it? the thread is [main] from the logs.
> Why JM does not read the config file by default?
>
> def createLocalEnvironment(parallelism: Int = 
> JavaEnv.getDefaultLocalParallelism):
> StreamExecutionEnvironment = {
>   new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
> }
>
> @PublicEvolving
> def createLocalEnvironmentWithWebUI(config: Configuration = null): 
> StreamExecutionEnvironment = {
>   val conf: Configuration = if (config == null) new Configuration() else 
> config
>   new 
> StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
> }
>
>
> On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I think the GlobalConfiguration is not necessarily read by the (local)
>> JobManager. You could try using
>> StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to
>> manually specify a configuration.
>>
>> Best,
>> Aljoscha
>>
>> On 26. Sep 2017, at 05:49, Hao Sun  wrote:
>>
>> Hi I am running flink in dev mode through Intellij, I have
>> flink-conf.yaml correctly configured and from the log you can see job
>> manager is reading it.
>>
>> 2017-09-25 20:41:52.255 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
>> configuration property: state.backend, rocksdb*
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.backend.fs.checkpointdir,
>> /tmp/flink/checkpoints/
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
>> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
>> 2017-09-25 20:41:52.256 [main] INFO
>>  org.apache.flink.configuration.GlobalConfiguration  - Loading
>> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>>
>> *But I still somehow get this error*
>> java.lang.IllegalStateException: CheckpointConfig says to persist
>> periodic checkpoints, but no checkpoint directory has been configured. You
>> can configure configure one via key 'state.checkpoints.dir'.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:209)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

2017-09-26 Thread Hao Sun
Thanks Aljoscha, I still have questions.
Do I have to parse the yaml to a Configuration file? If JM is not reading
the config how is reading it? the thread is [main] from the logs.
Why JM does not read the config file by default?

def createLocalEnvironment(parallelism: Int =
JavaEnv.getDefaultLocalParallelism):
StreamExecutionEnvironment = {
  new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null):
StreamExecutionEnvironment = {
  val conf: Configuration = if (config == null) new Configuration() else config
  new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}


On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek 
wrote:

> Hi,
>
> I think the GlobalConfiguration is not necessarily read by the (local)
> JobManager. You could try using
> StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to
> manually specify a configuration.
>
> Best,
> Aljoscha
>
> On 26. Sep 2017, at 05:49, Hao Sun  wrote:
>
> Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml
> correctly configured and from the log you can see job manager is reading it.
>
> 2017-09-25 20:41:52.255 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
> configuration property: state.backend, rocksdb*
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.backend.fs.checkpointdir,
> /tmp/flink/checkpoints/
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - *Loading
> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
> 2017-09-25 20:41:52.256 [main] INFO
>  org.apache.flink.configuration.GlobalConfiguration  - Loading
> configuration property: state.savepoints.dir, /tmp/flink/savepoints/
>
> *But I still somehow get this error*
> java.lang.IllegalStateException: CheckpointConfig says to persist periodic
> checkpoints, but no checkpoint directory has been configured. You can
> configure configure one via key 'state.checkpoints.dir'.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:209)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> *My program only has this related to checkpointing*
>
> val env = StreamExecutionEnvironment.*getExecutionEnvironment
> *env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2
>  * 60 * 1000)
>
>
> Need some help to dig through this. Thanks
>
> === Full log =
>
> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
>  com.zendesk.consul.Consul  - Collecting kafka nodes from
> Consul(con

CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

2017-09-25 Thread Hao Sun
Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml
correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.backend, rocksdb*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.backend.fs.checkpointdir,
/tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - *Loading
configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/*
2017-09-25 20:41:52.256 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: state.savepoints.dir, /tmp/flink/savepoints/

*But I still somehow get this error*
java.lang.IllegalStateException: CheckpointConfig says to persist periodic
checkpoints, but no checkpoint directory has been configured. You can
configure configure one via key 'state.checkpoints.dir'.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:209)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


*My program only has this related to checkpointing*

val env = StreamExecutionEnvironment.*getExecutionEnvironment
*env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2
* 60 * 1000)


Need some help to dig through this. Thanks

=== Full log =

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not
contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid
POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a
setter for field accountId
2017-09-25 20:41:51.985 [main] INFO
 org.apache.flink.api.java.typeutils.TypeExtractor  - class
com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO
type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO
 com.zendesk.consul.Consul  - Collecting kafka nodes from
Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO
 o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job
on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO
 org.apache.flink.configuration.GlobalConfiguration  - Loading
configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO
 org.ap

Re: Flink HA with Kubernetes, without Zookeeper

2017-08-22 Thread Hao Sun
Great suggestions, the etcd operator is very interesting, thanks James.

On Tue, Aug 22, 2017, 12:42 James Bucher  wrote:

> Just wanted to throw in a couple more details here from what I have
> learned from working with Kubernetes.
>
> *All processes restart (a lost JobManager restarts eventually). Should be
> given in Kubernetes*:
>
>- This works very well, we run multiple jobs with a single Jobmanager
>and Flink/Kubernetes recovers quite well.
>
> *A way for TaskManagers to discover the restarted JobManager. Should work
> via Kubernetes as well (restarted containers retain the external hostname)*
> :
>
>- We use StatefulSets which provide a DNS based discovery mechanism.
>Provided DNS is set up correctly with TTLs this works well. You could also
>leverage the built-in Kubernetes services if you are only running a single
>Job Manager. Kubernetes will just route the traffic to the single pod. This
>works fine with a single Job Manager (I have tested it). However multiple
>Job Managers won’t work because Kubernetes will route this round-robin to
>the Job Managers
>
> *A way to isolate different "leader sessions" against each other. Flink
> currently uses ZooKeeper to also attach a "leader session ID" to leader
> election, which is a fencing token to avoid that processes talk to each
> other despite having different views on who is the leader, or whether the
> leaser lost and re-gained leadership:*
>
>- This is probably the most difficult thing. You could leverage the
>built in ETCD cluster. Connecting directly to the Kubernetes ETCD database
>directly is probably a bad idea however. You should be able to create a
>counter using the PATCH API that Kubernetes supplies in the API which
>follows: https://tools.ietf.org/html/rfc6902
><https://tools.ietf.org/html/rfc6902> you could probably
>leverage https://tools.ietf.org/html/rfc6902#section-4.6
><https://tools.ietf.org/html/rfc6902#section-4.6> to allow for atomic
>updates to counters. Combining this with:
>
> https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
>
> <https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources>
>  should give a good
>way to work with ETCD without actually connecting directly to the
>Kubernetes ETCD directly. This integration would require modifying the Job
>Manager leader election code.
>
> *A distributed atomic counter for the checkpoint ID. This is crucial to
> ensure correctness of checkpoints in the presence of JobManager failures
> and re-elections or split-brain situations*.
>
>- This is very similar to the above, we should be able to accomplish
>that through the PATCH API combined with update if condition.
>
> If you don’t want to actually rip way into the code for the Job Manager
> the ETCD Operator <https://github.com/coreos/etcd-operator> would
> be a good way to bring up an ETCD cluster that is separate from the core
> Kubernetes ETCD database. Combined with zetcd you could probably have that
> up and running quickly.
>
> Thanks,
> James Bucher
>
> From: Hao Sun 
> Date: Monday, August 21, 2017 at 9:45 AM
> To: Stephan Ewen , Shannon Carey 
> Cc: "user@flink.apache.org" 
> Subject: Re: Flink HA with Kubernetes, without Zookeeper
>
> Thanks Shannon for the https://github.com/coreos/zetcd
> <https://github.com/coreos/zetcd> tips, I will check
> that out and share my results if we proceed on that path.
> Thanks Stephan for the details, this is very useful, I was about to ask
> what exactly is stored into zookeeper, haha.
>
> On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen  wrote:
>
>> Hi!
>>
>> That is a very interesting proposition. In cases where you have a single
>> master only, you may bet away with quite good guarantees without ZK. In
>> fact, Flink does not store significant data in ZK at all, it only uses
>> locks and counters.
>>
>> You can have a setup without ZK, provided you have the following:
>>
>>   - All processes restart (a lost JobManager restarts eventually). Should
>> be given in Kubernetes.
>>
>>   - A way for TaskManagers to discover the restarted JobManager. Should
>> work via Kubernetes as well (restarted containers retain the external
>> hostname)
>>
>>   - A way to isolate different "leader sessions" against each other.
>> Flink currently uses ZooKeeper to also attach a "leader session ID" to
>> leader election, which is a fencing token to avoid that processes talk to
>> each other despite having different views on who is the

Re: StandaloneResourceManager failed to associate with JobManager leader

2017-08-22 Thread Hao Sun
Thanks Till, the DEBUG log level is a good idea. I figured it out. I made a
mistake with `-` and `_`.

On Tue, Aug 22, 2017 at 1:39 AM Till Rohrmann  wrote:

> Hi Hao Sun,
>
> have you checked that one can resolve the hostname flink_jobmanager from
> within the container? This is required to connect to the JobManager. If
> this is the case, then log files with DEBUG log level would be helpful to
> track down the problem.
>
> Cheers,
> Till
>
> On Wed, Aug 16, 2017 at 5:35 AM, Hao Sun  wrote:
>
>> Hi,
>>
>> I am trying to run a cluster of job-manager and task-manager in docker.
>> One of each for now. I got a StandaloneResourceManager error, stating
>> that it can not associate with job-manager. I do not know what was wrong.
>>
>> I am sure that job-manager can be connected.
>> ===
>> root@flink-jobmanager:/opt/flink# telnet flink_jobmanager 32929
>> Trying 172.18.0.3...
>> Connected to flink-jobmanager.
>> Escape character is '^]'.
>> Connection closed by foreign host.
>> ===
>>
>> Here is my config:
>> ===
>> Starting Job Manager
>> config file:
>> jobmanager.rpc.address: flink_jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.web.port: 8081
>> jobmanager.heap.mb: 1024
>> taskmanager.heap.mb: 1024
>> taskmanager.numberOfTaskSlots: 1
>> taskmanager.memory.preallocate: false
>> parallelism.default: 1
>> jobmanager.archive.fs.dir: file:///flink_data/completed-jobs/
>> historyserver.archive.fs.dir: file:///flink_data/completed-jobs/
>> state.backend: rocksdb
>> state.backend.fs.checkpointdir: file:///flink_data/checkpoints
>> taskmanager.tmp.dirs: /flink_data/tmp
>> blob.storage.directory: /flink_data/tmp
>> jobmanager.web.tmpdir: /flink_data/tmp
>> env.log.dir: /flink_data/logs
>> high-availability: zookeeper
>> high-availability.storageDir: file:///flink_data/ha/
>> high-availability.zookeeper.quorum: kafka:2181
>> blob.server.port: 6124
>> query.server.port: 6125
>> ===
>>
>> Here is the major error I see:
>> ===
>> 2017-08-16 02:46:23,586 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2017-08-16 02:46:23,612 INFO
>> org.apache.flink.runtime.jobmanager.JobManager - JobManager
>> akka.tcp://flink@flink_jobmanager:32929/user/jobmanager was granted
>> leadership with leader session ID
>> Some(06abc8f5-c1b9-44b2-bb7f-771c74981552).
>> 2017-08-16 02:46:23,627 INFO
>> org.apache.flink.runtime.jobmanager.JobManager - Delaying recovery of all
>> jobs by 1 milliseconds.
>> 2017-08-16 02:46:23,638 INFO
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader
>> reachable under akka.tcp://flink@flink_jobmanager
>> :32929/user/jobmanager:06abc8f5-c1b9-44b2-bb7f-771c74981552.
>> 2017-08-16 02:46:23,640 INFO
>> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>> - Trying to associate with JobManager leader
>> akka.tcp://flink@flink_jobmanager:32929/user/jobmanager
>> 2017-08-16 02:46:23,653 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka://flink/deadLetters), Path(/)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at
>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
>> at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:63)
>

Re: Flink HA with Kubernetes, without Zookeeper

2017-08-21 Thread Hao Sun
Thanks Shannon for the https://github.com/coreos/zetcd
<https://github.com/coreos/zetcd> tips, I will check that
out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask
what exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen  wrote:

> Hi!
>
> That is a very interesting proposition. In cases where you have a single
> master only, you may bet away with quite good guarantees without ZK. In
> fact, Flink does not store significant data in ZK at all, it only uses
> locks and counters.
>
> You can have a setup without ZK, provided you have the following:
>
>   - All processes restart (a lost JobManager restarts eventually). Should
> be given in Kubernetes.
>
>   - A way for TaskManagers to discover the restarted JobManager. Should
> work via Kubernetes as well (restarted containers retain the external
> hostname)
>
>   - A way to isolate different "leader sessions" against each other. Flink
> currently uses ZooKeeper to also attach a "leader session ID" to leader
> election, which is a fencing token to avoid that processes talk to each
> other despite having different views on who is the leader, or whether the
> leaser lost and re-gained leadership.
>
>   - An atomic marker for what is the latest completed checkpoint.
>
>   - A distributed atomic counter for the checkpoint ID. This is crucial to
> ensure correctness of checkpoints in the presence of JobManager failures
> and re-elections or split-brain situations.
>
> I would assume that etcd can provide all of those services. The best way
> to integrate it would probably be to add an implementation of Flink's
> "HighAvailabilityServices" based on etcd.
>
> Have a look at this class:
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
> <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java>
>
> If you want to contribute an extension of Flink using etcd, that would be
> awesome.
> This should have a FLIP though, and a plan on how to set up rigorous unit
> testing of that implementation (because its correctness is very crucial to
> Flink's HA resilience).
>
> Best,
> Stephan
>
>
> On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey  wrote:
>
>> Zookeeper should still be necessary even in that case, because it is
>> where the JobManager stores information which needs to be recovered after
>> the JobManager fails.
>>
>> We're eyeing https://github.com/coreos/zetcd
>> <https://github.com/coreos/zetcd> as a way to run
>> Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely
>> on a separate Zookeeper cluster. However, we haven't tried it yet.
>>
>> -Shannon
>>
>> From: Hao Sun 
>> Date: Sunday, August 20, 2017 at 9:04 PM
>> To: "user@flink.apache.org" 
>> Subject: Flink HA with Kubernetes, without Zookeeper
>>
>> Hi, I am new to Flink and trying to bring up a Flink cluster on top of
>> Kubernetes.
>>
>> For HA setup, with kubernetes, I think I just need one job manager and do
>> not need Zookeeper? I will store all states to S3 buckets. So in case of
>> failure, kubernetes can just bring up a new job manager without losing
>> anything?
>>
>> I want to confirm my assumptions above make sense. Thanks
>>
>
>


Flink HA with Kubernetes, without Zookeeper

2017-08-20 Thread Hao Sun
Hi, I am new to Flink and trying to bring up a Flink cluster on top of
Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do
not need Zookeeper? I will store all states to S3 buckets. So in case of
failure, kubernetes can just bring up a new job manager without losing
anything?

I want to confirm my assumptions above make sense. Thanks


  1   2   >