Re: Record timestamp from kafka

2018-03-29 Thread Ben Yan
hi,
Is that what you mean?
See : 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=16377145#comment-16377145
 

 

Best
Ben

> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan  
> wrote:
> 
> Hi,
> 
> Is there way to get the kafka timestamp in deserialization schema? All 
> records are written to kafka with timestamp and I would like to set that 
> timestamp to every record that is ingested. Thanks.



Record timestamp from kafka

2018-03-29 Thread Navneeth Krishnan
Hi,

Is there way to get the kafka timestamp in deserialization schema? All
records are written to kafka with timestamp and I would like to set that
timestamp to every record that is ingested. Thanks.


Re: bad data output

2018-03-29 Thread 杨力
You can use a split operator, generating 2 streams.

Darshan Singh  于 2018年3月30日周五 上午2:53写道:

> Hi
>
> I have a dataset which has almost 99% of correct data. As of now if say
> some data is bad I just ignore it and log it and return only correct data.
> I do this inside a map function.
>
> The part which decides whether data is correct or not is expensive one.
>
> Now I want to store the bad data somewhere so that I could analyze that
> data in future.
>
> So I can run the same calc 2 times and get the correct data in first go
> and bad data in 2nd go.
>
> Is there a better way where I can somehow store the bad data from inside
> of map function like send to kafka, file etc?
>
> Also, is there a way I could create a datastream which can get the data
> from inside map function(not sure this is feasible as of now)?
>
> Thanks
>


Anyway to read Cassandra as DataStream/DataSet in Flink?

2018-03-29 Thread James Yu
Hi,

I tried to treat Cassandra as the source of data in Flink with the
information provided in the following links:
-
https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
-
https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the
the 1st link, this exception occurs due to network problem. However, the
strange thing is that I am running Cassandra on my local VM with only 10
rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to
RichMapFunction can avoid the AsyncWaitOperator exception, can someone with
similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async
wait operator -> (Flat Map, Sink: Unnamed) (1/1)
(2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc
(org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory
(com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate
(com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at

Re: Plain text SSL passwords in Log file

2018-03-29 Thread Szymon Szczypiński

Hi,

i have the same problem with flink 1.3.1 and i created jira 
https://issues.apache.org/jira/browse/FLINK-9100 ( i saw that you wrote 
about my jira in your jira :)).


For now to avoid printing password in log in logback.xml i configure  
that  class GlobalConfiguration is logged only to separate file not to 
global log file. I thin that you also set to log to /dev/null.


But in my opinion it is enough to log to separate file that is not send 
to another machine because in that case password is still visible in 
configuration file.


Regards
Szymon Szczypiński



On 29.03.2018 16:29, Vinay Patil wrote:
I have created FLINK-9111 
 as this is not 
handled in the latest code of GlobalConfiguration.


Regards,
Vinay Patil

On Thu, Mar 29, 2018 at 8:33 AM, Vinay Patil > wrote:


Hi,

If this is not part of Flink 1.5 or not handled in latest 1.4.2
release, I can open a JIRA. Should be a small change.

What do you think ?

Regards,
Vinay Patil

On Wed, Mar 28, 2018 at 4:11 PM, Vinay Patil
> wrote:

Hi Greg,

I am not concerned with flink-conf.yaml file, we have taken
care of the passwords there by replacing them with
placeholders. We are picking the passwords from our vault.

The main issue is that Flink is printing these passwords in
plain text in log file. It should be simple check to not print
the ssl passwords .

Regards,
Vinay Patil

On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan
> wrote:

With the current method you always have the risk, no
matter which keywords you filter on ("secret", "password",
etc.), that the key name is mistyped and inadvertently logged.

Perhaps we could implement something like TravisCI's
encryption keys
[https://docs.travis-ci.com/user/encryption-keys/
] at a
cost of added complexity.

On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil
>
wrote:

Hi,

I see plain text SSL passwords in log file (printed by
GlobalConfiguration) , because of which we cannot
deploy our pipeline to NR environment.

I am able to avoid this by having ERROR log level for
this class but the security team still think it is a risk.

Is this taken care in the new release ? (I am using
Flink 1.3.2)

Regards,
Vinay Patil









Re: How can I set configuration of process function from job's main?

2018-03-29 Thread Timo Walther

Hi,

the configuration parameter is just legacy API. You can simply pass any 
serializable object to the constructor of your process function.


Regards,
Timo


Am 29.03.18 um 20:38 schrieb Main Frame:

Hi guys! Iam newbie in flink and I have probably silly question about streaming 
api.

So for the instance:

I trying to apply SomeProcessFunction to stream1

…
DataStream stream2 = stream1.process(new 
MyProcessFunction()).name("Ingest data»);
…

I have created package-private class with MyProcessFunction which extends 
ProcessFunction.

class MyProcessFunction extends ProcessFunction {

 @Override
 public void open(Configuration parameters) throws Exception {
  ...
 }
 …
}

How can I set parameters variable before execution of MyProcessFunction?

———

Timofeev Dmitry
VoIP Engineer
Open source telephony solutions
Skype: itsroot
Linkedin: https://www.linkedin.com/in/itsroot





bad data output

2018-03-29 Thread Darshan Singh
Hi

I have a dataset which has almost 99% of correct data. As of now if say
some data is bad I just ignore it and log it and return only correct data.
I do this inside a map function.

The part which decides whether data is correct or not is expensive one.

Now I want to store the bad data somewhere so that I could analyze that
data in future.

So I can run the same calc 2 times and get the correct data in first go and
bad data in 2nd go.

Is there a better way where I can somehow store the bad data from inside of
map function like send to kafka, file etc?

Also, is there a way I could create a datastream which can get the data
from inside map function(not sure this is feasible as of now)?

Thanks


How can I set configuration of process function from job's main?

2018-03-29 Thread Main Frame
Hi guys! Iam newbie in flink and I have probably silly question about streaming 
api.

So for the instance:

I trying to apply SomeProcessFunction to stream1

…
DataStream stream2 = stream1.process(new 
MyProcessFunction()).name("Ingest data»);
…

I have created package-private class with MyProcessFunction which extends 
ProcessFunction.

class MyProcessFunction extends ProcessFunction {

@Override
public void open(Configuration parameters) throws Exception {
 ...
}
…
}

How can I set parameters variable before execution of MyProcessFunction?

———

Timofeev Dmitry
VoIP Engineer
Open source telephony solutions
Skype: itsroot
Linkedin: https://www.linkedin.com/in/itsroot



Job restart hook

2018-03-29 Thread Navneeth Krishnan
Hi,

Is there a way for a script to be called whenever a job gets restarted? My
scenario is lets say there are 20 slots and the job runs on all 20 slots.
After a while a task manager goes down and now there are only 14 slots and
I need to readjust the parallelism of my job to ensure the job runs until
the lost TM comes up again. It would be great to know how others are
handling this situation.

Thanks,
Navneeth


RE: How does setMaxParallelism work

2018-03-29 Thread NEKRASSOV, ALEXEI
Is there an auto-scaling feature in Flink, where I start with parallelism of 
(for example) 1, but Flink notices I have high volume of data to process, and 
automatically increases parallelism of a running job?

Thanks,
Alex

-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: Wednesday, March 28, 2018 8:54 AM
To: Data Engineer 
Cc: Jörn Franke ; user@flink.apache.org
Subject: Re: How does setMaxParallelism work

Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1], by 
default 1) which is used if you do not specify it yourself.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options

On 28/03/18 13:21, Data Engineer wrote:
> Agreed. But how did Flink decide that it should allot 1 subtask? Why 
> not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism 
> vs setParallelism
> 
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber  > wrote:
> 
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p   
> 
> 
> Nico
> 
> 
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
>  _ready.html#set-maximum-parallelism-for-operators-explicitly>
> 
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke  
> > >> wrote:
> >
> >     What was the input format, the size and the program that you tried
> >     to execute
> >
> >     On 28. Mar 2018, at 08:18, Data Engineer  
> >      >> wrote:
> >
> >>     I went through the explanation on MaxParallelism in the official
> >>     docs here:
> >>     
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
> 
> >>     
>  
> >
> >>
> >>     However, I am not able to figure out how Flink decides the
> >>     parallelism value.
> >>     For instance, if I setMaxParallelism to 3, I see that for my job,
> >>     there is only 1 subtask that is created. How did Flink decide that
> >>     1 subtask was enough?
> >>
> >>     Regards,
> >>     James
> >
> >
> 
> --
> Nico Kruber | Software Engineer
> data Artisans
> 
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 
> 

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference Stream Processing | Event 
Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, 
Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. 
Kostas Tzoumas, Dr. Stephan Ewen



Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Nico Kruber
That BlobServerConnection error is caused by a TaskManager which
requested a BLOB (a jar file) but then closed the connection. I guess
that may happen when the job is cancelled and the TaskManager processes
are terminated.

If this is not happening during that scenario, then your TaskManager
probably died from something else, but since you didn't see anything in
the logs there, I don't think this is an issue.


Nico

On 29/03/18 16:24, Gary Yao wrote:
> Hi Juho,
> 
> Thanks for the follow up. Regarding the BlobServerConnection error, Nico
> (cc'ed)
> might have an idea. 
> 
> Best,
> Gary
> 
> On Thu, Mar 29, 2018 at 4:08 PM, Juho Autio  > wrote:
> 
> Sorry, my bad. I checked the persisted jobmanager logs and can see
> that job was still being restarted at 15:31 and then at 15:36. If I
> wouldn't have terminated the cluster, I believe the flink job / yarn
> app would've eventually exited as failed.
> 
> 
> On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  > wrote:
> 
> Thanks again, Gary.
> 
> It's true that I only let the job remain in the stuck state for
> something between 10-15 minutes. Then I shut down the cluster.
> 
> But: if restart strategy is being applied, shouldn't I have seen
> those messages in job manager log? In my case it kept all quiet
> since ~2018-03-28 15:27 and I terminated it at ~28-03-2018 15:36.
> 
> Do you happen to know about what that BlobServerConnection error
> means in the code? If it may lead into some unrecoverable state
> (where neither restart is attempted, nor job is failed for good)..
> 
> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao
> > wrote:
> 
> Hi Juho,
> 
> The log message
> 
>   Could not allocate all requires slots within timeout of
> 30 ms. Slots required: 20, slots allocated: 8
> 
> indicates that you do not have enough resources in your
> cluster left. Can you
> verify that after you started the job submission the YARN
> cluster does not reach
> its maximum capacity? You can also try submitting the job
> with a lower
> parallelism.
> 
> I think the reason why the YARN application is not
> immediately shown as failed
> is that your restart strategy attempts to start the job 3
> times. On every
> attempt the job is blocked on the slot allocation timeout
> for at least 30 ms
> (5 minutes). I have tried submitting
> examples/streaming/WordCount.jar with the
> same restart strategy on EMR, and the CLI only returns after
> around 20 minutes.
> 
> As a side note, beginning from Flink 1.5, you do not need to
> specify -yn -ys
> because resource allocations are dynamic by default
> (FLIP-6). The parameter -yst
> is deprecated and should not be needed either.
> 
> Best,
> Gary
> 
> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio
> > wrote:
> 
> I built a new Flink distribution from release-1.5 branch
> yesterday.
> 
> The first time I tried to run a job with it ended up in
> some stalled state so that the job didn't manage to
> (re)start but what makes it worse is that it didn't exit
> as failed either.
> 
> Next time I tried running the same job (but new EMR
> cluster & all from scratch) it just worked normally.
> 
> On the problematic run, The YARN job was started and
> Flink UI was being served, but Flink UI kept showing
> status CREATED for all sub-tasks and nothing seemed to
> be happening.
> 
> I found this in Job manager log first (could be unrelated) :
> 
> 2018-03-28 15:26:17,449 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
>       - Job UniqueIdStream
> (43ed4ace55974d3c486452a45ee5db93) switched from state
> RUNNING to FAILING.
> 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of
> 30 ms. Slots required: 20, slots allocated: 8
> at
> 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$36(ExecutionGraph.java:984)
> at
> 
> 

Re: SSL config on Kubernetes - Dynamic IP

2018-03-29 Thread Edward Alexander Rojas Clavijo
Hi all,

I did some tests based on the PR Christophe mentioned above and by making a
change on the NettyClient to use CanonicalHostName instead of
HostNameAddress to identify the server, the SSL validation works!!

I created a PR with this change: https://github.com/apache/flink/pull/5789

Regards,
Edward

2018-03-28 17:22 GMT+02:00 Edward Alexander Rojas Clavijo <
edward.roja...@gmail.com>:

> Hi Till,
>
> I just created the JIRA ticket: https://issues.apache.org/
> jira/browse/FLINK-9103
>
> I added the JobManager and TaskManager logs, Hope this helps to resolve
> the issue.
>
> Regards,
> Edward
>
> 2018-03-27 17:48 GMT+02:00 Till Rohrmann :
>
>> Hi Edward,
>>
>> could you please file a JIRA issue for this problem. It might be as
>> simple as that the TaskManager's network stack uses the IP instead of the
>> hostname as you suggested. But we have to look into this to be sure. Also
>> the logs of the JobManager as well as the TaskManagers could be helpful.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif 
>> wrote:
>>
>>>
>>> I suspect this relates to: https://issues.apache.org/
>>> jira/browse/FLINK-5030
>>>
>>> For which there was a PR at some point but nothing has been done so far.
>>> It seems the current code explicitly uses the IP vs Hostname for Netty SSL
>>> configuration.
>>>
>>> Without that I'm really wondering how people are reasonably using SSL on
>>> a Kubernetes Flink-based cluster as every time a pod is (re-started) it can
>>> theoretically take a different IP? Or do I miss something?
>>>
>>> --
>>> Christophe
>>>
>>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>>> edward.roja...@gmail.com> wrote:
>>>
 Hi all,

 Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
 configuration based on https://ci.apache.org/proje
 cts/flink/flink-docs-master/ops/security-ssl.html.

 However, as the IP of the nodes are dynamic (from the nature of
 kubernetes), we are using only the DNS which we can control using
 kubernetes services. So we add to the Subject Alternative Name(SAN) the
 flink-jobmanager DNS and also the DNS for the task managers
 *.flink-taskmanager-svc (each task manager has a DNS in the form
 flink-taskmanager-0.flink-taskmanager-svc).

 Additionally we set the jobmanager.rpc.address property on all the
 nodes and each task manager sets the taskmanager.host property, all
 matching the ones on the certificate.

 This is working well when using Job with Parallelism set to 1. The SSL
 validations are good and the Jobmanager can communicate with Task manager
 and vice versa.

 But when we set the parallelism to more than 1 we have exceptions on
 the SSL validation like this:

 Caused by: java.security.cert.CertificateException: No subject
 alternative names matching IP address 172.30.247.163 found
 at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
 at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
 at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
 tManagerImpl.java:455)
 at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
 tManagerImpl.java:436)
 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
 ManagerImpl.java:252)
 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
 9TrustManagerImpl.java:136)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
 ndshaker.java:1601)
 ... 21 more


 From the logs I see the Jobmanager is correctly registering the
 taskmanagers:

 org.apache.flink.runtime.instance.InstanceManager   - Registered
 TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
 ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
 as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered
 hosts is 3. Current number of alive task slots is 6.

 And also each taskmanager is correctly registered to use the hostname
 for communication:

 org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will
 use hostname/address 'flink-taskmanager-1.flink-tas
 kmanager-svc.default.svc.cluster.local' (172.30.247.163) for
 communication.
 ...
 akka.remote.Remoting   - Remoting started; listening on addresses
 :[akka.ssl.tcp://flink@flink-taskmanager-1.flink-taskmanager
 -svc.default.svc.cluster.local:6122]
 ...
 org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
 [server address: flink-taskmanager-1.flink-task
 manager-svc.default.svc.cluster.local/172.30.247.163, server port:
 6121, ssl enabled: true, memory segment size (bytes): 32768, transport
 type: NIO, number of server threads: 2 (manual), number of client threads:
 2 (manual), server 

Re: Plain text SSL passwords in Log file

2018-03-29 Thread Vinay Patil
I have created FLINK-9111  as
this is not handled in the latest code of GlobalConfiguration.

Regards,
Vinay Patil

On Thu, Mar 29, 2018 at 8:33 AM, Vinay Patil 
wrote:

> Hi,
>
> If this is not part of Flink 1.5 or not handled in latest 1.4.2 release, I
> can open a JIRA. Should be a small change.
>
> What do you think ?
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 28, 2018 at 4:11 PM, Vinay Patil 
> wrote:
>
>> Hi Greg,
>>
>> I am not concerned with flink-conf.yaml file, we have taken care of the
>> passwords there by replacing them with placeholders. We are picking the
>> passwords from our vault.
>>
>> The main issue is that Flink is printing these passwords in plain text in
>> log file. It should be simple check to not print the ssl passwords .
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan  wrote:
>>
>>> With the current method you always have the risk, no matter which
>>> keywords you filter on ("secret", "password", etc.), that the key name is
>>> mistyped and inadvertently logged.
>>>
>>> Perhaps we could implement something like TravisCI's encryption keys [
>>> https://docs.travis-ci.com/user/encryption-keys/] at a cost of added
>>> complexity.
>>>
>>> On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil 
>>> wrote:
>>>
 Hi,

 I see plain text SSL passwords in log file (printed by
 GlobalConfiguration) , because of which we cannot deploy our pipeline to NR
 environment.

 I am able to avoid this by having ERROR log level for this class but
 the security team still think it is a risk.

 Is this taken care in the new release ? (I am using Flink 1.3.2)

 Regards,
 Vinay Patil

>>>
>>>
>>
>


Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Gary Yao
Hi Juho,

Thanks for the follow up. Regarding the BlobServerConnection error, Nico
(cc'ed)
might have an idea.

Best,
Gary

On Thu, Mar 29, 2018 at 4:08 PM, Juho Autio  wrote:

> Sorry, my bad. I checked the persisted jobmanager logs and can see that
> job was still being restarted at 15:31 and then at 15:36. If I wouldn't
> have terminated the cluster, I believe the flink job / yarn app would've
> eventually exited as failed.
>
>
> On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  wrote:
>
>> Thanks again, Gary.
>>
>> It's true that I only let the job remain in the stuck state for something
>> between 10-15 minutes. Then I shut down the cluster.
>>
>> But: if restart strategy is being applied, shouldn't I have seen those
>> messages in job manager log? In my case it kept all quiet since ~2018-03-28
>> 15:27 and I terminated it at ~28-03-2018 15:36.
>>
>> Do you happen to know about what that BlobServerConnection error means in
>> the code? If it may lead into some unrecoverable state (where neither
>> restart is attempted, nor job is failed for good)..
>>
>> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:
>>
>>> Hi Juho,
>>>
>>> The log message
>>>
>>>   Could not allocate all requires slots within timeout of 30 ms.
>>> Slots required: 20, slots allocated: 8
>>>
>>> indicates that you do not have enough resources in your cluster left.
>>> Can you
>>> verify that after you started the job submission the YARN cluster does
>>> not reach
>>> its maximum capacity? You can also try submitting the job with a lower
>>> parallelism.
>>>
>>> I think the reason why the YARN application is not immediately shown as
>>> failed
>>> is that your restart strategy attempts to start the job 3 times. On every
>>> attempt the job is blocked on the slot allocation timeout for at least
>>> 30 ms
>>> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
>>> with the
>>> same restart strategy on EMR, and the CLI only returns after around 20
>>> minutes.
>>>
>>> As a side note, beginning from Flink 1.5, you do not need to specify -yn
>>> -ys
>>> because resource allocations are dynamic by default (FLIP-6). The
>>> parameter -yst
>>> is deprecated and should not be needed either.
>>>
>>> Best,
>>> Gary
>>>
>>> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio 
>>> wrote:
>>>
 I built a new Flink distribution from release-1.5 branch yesterday.

 The first time I tried to run a job with it ended up in some stalled
 state so that the job didn't manage to (re)start but what makes it worse is
 that it didn't exit as failed either.

 Next time I tried running the same job (but new EMR cluster & all from
 scratch) it just worked normally.

 On the problematic run, The YARN job was started and Flink UI was being
 served, but Flink UI kept showing status CREATED for all sub-tasks and
 nothing seemed to be happening.

 I found this in Job manager log first (could be unrelated) :

 2018-03-28 15:26:17,449 INFO  
 org.apache.flink.runtime.executiongraph.ExecutionGraph
   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
 from state RUNNING to FAILING.
 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
 Could not allocate all requires slots within timeout of 30 ms. Slots
 required: 20, slots allocated: 8
 at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
 a$scheduleEager$36(ExecutionGraph.java:984)
 at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
 letableFuture.java:870)
 at java.util.concurrent.CompletableFuture$UniExceptionally.tryF
 ire(CompletableFuture.java:852)
 at java.util.concurrent.CompletableFuture.postComplete(Completa
 bleFuture.java:474)
 at java.util.concurrent.CompletableFuture.completeExceptionally
 (CompletableFuture.java:1977)
 at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
 ctFuture.handleCompletedFuture(FutureUtils.java:551)
 at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
 etableFuture.java:760)
 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFi
 re(CompletableFuture.java:736)
 at java.util.concurrent.CompletableFuture.postComplete(Completa
 bleFuture.java:474)
 at java.util.concurrent.CompletableFuture.completeExceptionally
 (CompletableFuture.java:1977)
 at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
 (FutureUtils.java:789)
 at akka.dispatch.OnComplete.internal(Future.scala:258)
 at akka.dispatch.OnComplete.internal(Future.scala:256)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at 

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Juho Autio
Sorry, my bad. I checked the persisted jobmanager logs and can see that job
was still being restarted at 15:31 and then at 15:36. If I wouldn't have
terminated the cluster, I believe the flink job / yarn app would've
eventually exited as failed.

On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  wrote:

> Thanks again, Gary.
>
> It's true that I only let the job remain in the stuck state for something
> between 10-15 minutes. Then I shut down the cluster.
>
> But: if restart strategy is being applied, shouldn't I have seen those
> messages in job manager log? In my case it kept all quiet since ~2018-03-28
> 15:27 and I terminated it at ~28-03-2018 15:36.
>
> Do you happen to know about what that BlobServerConnection error means in
> the code? If it may lead into some unrecoverable state (where neither
> restart is attempted, nor job is failed for good)..
>
> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:
>
>> Hi Juho,
>>
>> The log message
>>
>>   Could not allocate all requires slots within timeout of 30 ms.
>> Slots required: 20, slots allocated: 8
>>
>> indicates that you do not have enough resources in your cluster left. Can
>> you
>> verify that after you started the job submission the YARN cluster does
>> not reach
>> its maximum capacity? You can also try submitting the job with a lower
>> parallelism.
>>
>> I think the reason why the YARN application is not immediately shown as
>> failed
>> is that your restart strategy attempts to start the job 3 times. On every
>> attempt the job is blocked on the slot allocation timeout for at least
>> 30 ms
>> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
>> with the
>> same restart strategy on EMR, and the CLI only returns after around 20
>> minutes.
>>
>> As a side note, beginning from Flink 1.5, you do not need to specify -yn
>> -ys
>> because resource allocations are dynamic by default (FLIP-6). The
>> parameter -yst
>> is deprecated and should not be needed either.
>>
>> Best,
>> Gary
>>
>> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:
>>
>>> I built a new Flink distribution from release-1.5 branch yesterday.
>>>
>>> The first time I tried to run a job with it ended up in some stalled
>>> state so that the job didn't manage to (re)start but what makes it worse is
>>> that it didn't exit as failed either.
>>>
>>> Next time I tried running the same job (but new EMR cluster & all from
>>> scratch) it just worked normally.
>>>
>>> On the problematic run, The YARN job was started and Flink UI was being
>>> served, but Flink UI kept showing status CREATED for all sub-tasks and
>>> nothing seemed to be happening.
>>>
>>> I found this in Job manager log first (could be unrelated) :
>>>
>>> 2018-03-28 15:26:17,449 INFO  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
>>> from state RUNNING to FAILING.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Could not allocate all requires slots within timeout of 30 ms. Slots
>>> required: 20, slots allocated: 8
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
>>> a$scheduleEager$36(ExecutionGraph.java:984)
>>> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
>>> letableFuture.java:870)
>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryF
>>> ire(CompletableFuture.java:852)
>>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>>> bleFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally
>>> (CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
>>> ctFuture.handleCompletedFuture(FutureUtils.java:551)
>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
>>> etableFuture.java:760)
>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFi
>>> re(CompletableFuture.java:736)
>>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>>> bleFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally
>>> (CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
>>> (FutureUtils.java:789)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutio
>>> nContext.execute(Executors.java:83)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Prom
>>> ise.scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
>>> mise.scala:252)
>>> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
>>> ort.scala:603)
>>> at 

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Juho Autio
Thanks again, Gary.

It's true that I only let the job remain in the stuck state for something
between 10-15 minutes. Then I shut down the cluster.

But: if restart strategy is being applied, shouldn't I have seen those
messages in job manager log? In my case it kept all quiet since ~2018-03-28
15:27 and I terminated it at ~28-03-2018 15:36.

Do you happen to know about what that BlobServerConnection error means in
the code? If it may lead into some unrecoverable state (where neither
restart is attempted, nor job is failed for good)..

On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:

> Hi Juho,
>
> The log message
>
>   Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 20, slots allocated: 8
>
> indicates that you do not have enough resources in your cluster left. Can
> you
> verify that after you started the job submission the YARN cluster does not
> reach
> its maximum capacity? You can also try submitting the job with a lower
> parallelism.
>
> I think the reason why the YARN application is not immediately shown as
> failed
> is that your restart strategy attempts to start the job 3 times. On every
> attempt the job is blocked on the slot allocation timeout for at least
> 30 ms
> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
> with the
> same restart strategy on EMR, and the CLI only returns after around 20
> minutes.
>
> As a side note, beginning from Flink 1.5, you do not need to specify -yn
> -ys
> because resource allocations are dynamic by default (FLIP-6). The
> parameter -yst
> is deprecated and should not be needed either.
>
> Best,
> Gary
>
> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:
>
>> I built a new Flink distribution from release-1.5 branch yesterday.
>>
>> The first time I tried to run a job with it ended up in some stalled
>> state so that the job didn't manage to (re)start but what makes it worse is
>> that it didn't exit as failed either.
>>
>> Next time I tried running the same job (but new EMR cluster & all from
>> scratch) it just worked normally.
>>
>> On the problematic run, The YARN job was started and Flink UI was being
>> served, but Flink UI kept showing status CREATED for all sub-tasks and
>> nothing seemed to be happening.
>>
>> I found this in Job manager log first (could be unrelated) :
>>
>> 2018-03-28 15:26:17,449 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
>> from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 20, slots allocated: 8
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
>> a$scheduleEager$36(ExecutionGraph.java:984)
>> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
>> letableFuture.java:870)
>> at java.util.concurrent.CompletableFuture$UniExceptionally.
>> tryFire(CompletableFuture.java:852)
>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>> bleFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally
>> (CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
>> ctFuture.handleCompletedFuture(FutureUtils.java:551)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
>> etableFuture.java:760)
>> at java.util.concurrent.CompletableFuture$UniWhenComplete.
>> tryFire(CompletableFuture.java:736)
>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>> bleFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally
>> (CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
>> (FutureUtils.java:789)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutio
>> nContext.execute(Executors.java:83)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
>> Promise.scala:44)
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
>> mise.scala:252)
>> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
>> ort.scala:603)
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedE
>> xecute(Future.scala:601)
>> at scala.concurrent.BatchingExecutor$class.execute(
>> BatchingExecutor.scala:109)
>> at scala.concurrent.Future$InternalCallbackExecutor$.execute(
>> Future.scala:599)
>> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
>> executeTask(LightArrayRevolverScheduler.scala:329)
>> at 

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Gary Yao
Hi Juho,

The log message

  Could not allocate all requires slots within timeout of 30 ms. Slots
required: 20, slots allocated: 8

indicates that you do not have enough resources in your cluster left. Can
you
verify that after you started the job submission the YARN cluster does not
reach
its maximum capacity? You can also try submitting the job with a lower
parallelism.

I think the reason why the YARN application is not immediately shown as
failed
is that your restart strategy attempts to start the job 3 times. On every
attempt the job is blocked on the slot allocation timeout for at least
30 ms
(5 minutes). I have tried submitting examples/streaming/WordCount.jar with
the
same restart strategy on EMR, and the CLI only returns after around 20
minutes.

As a side note, beginning from Flink 1.5, you do not need to specify -yn -ys
because resource allocations are dynamic by default (FLIP-6). The parameter
-yst
is deprecated and should not be needed either.

Best,
Gary

On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:

> I built a new Flink distribution from release-1.5 branch yesterday.
>
> The first time I tried to run a job with it ended up in some stalled state
> so that the job didn't manage to (re)start but what makes it worse is that
> it didn't exit as failed either.
>
> Next time I tried running the same job (but new EMR cluster & all from
> scratch) it just worked normally.
>
> On the problematic run, The YARN job was started and Flink UI was being
> served, but Flink UI kept showing status CREATED for all sub-tasks and
> nothing seemed to be happening.
>
> I found this in Job manager log first (could be unrelated) :
>
> 2018-03-28 15:26:17,449 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
> from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 20, slots allocated: 8
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> lambda$scheduleEager$36(ExecutionGraph.java:984)
> at java.util.concurrent.CompletableFuture.uniExceptionally(
> CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
> CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.
> handleCompletedFuture(FutureUtils.java:551)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.
> onComplete(FutureUtils.java:789)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.
> execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
> scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
> Promise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(
> AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at scala.concurrent.Future$InternalCallbackExecutor$.
> unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.
> scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.
> execute(Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(
> LightArrayRevolverScheduler.scala:329)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(
> LightArrayRevolverScheduler.scala:280)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(
> LightArrayRevolverScheduler.scala:284)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(
> LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
>
>
> After this there was:
>
> 2018-03-28 15:26:17,521 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Restarting the job UniqueIdStream (43ed4ace55974d3c486452a45ee5db
> 93).
>
>
> And some time after that:
>
> 2018-03-28 15:27:39,125 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection
>   - GET operation failed
> java.io.EOFException: Premature end of GET 

Re: Plain text SSL passwords in Log file

2018-03-29 Thread Vinay Patil
Hi,

If this is not part of Flink 1.5 or not handled in latest 1.4.2 release, I
can open a JIRA. Should be a small change.

What do you think ?

Regards,
Vinay Patil

On Wed, Mar 28, 2018 at 4:11 PM, Vinay Patil 
wrote:

> Hi Greg,
>
> I am not concerned with flink-conf.yaml file, we have taken care of the
> passwords there by replacing them with placeholders. We are picking the
> passwords from our vault.
>
> The main issue is that Flink is printing these passwords in plain text in
> log file. It should be simple check to not print the ssl passwords .
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan  wrote:
>
>> With the current method you always have the risk, no matter which
>> keywords you filter on ("secret", "password", etc.), that the key name is
>> mistyped and inadvertently logged.
>>
>> Perhaps we could implement something like TravisCI's encryption keys [
>> https://docs.travis-ci.com/user/encryption-keys/] at a cost of added
>> complexity.
>>
>> On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> I see plain text SSL passwords in log file (printed by
>>> GlobalConfiguration) , because of which we cannot deploy our pipeline to NR
>>> environment.
>>>
>>> I am able to avoid this by having ERROR log level for this class but the
>>> security team still think it is a risk.
>>>
>>> Is this taken care in the new release ? (I am using Flink 1.3.2)
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>
>>
>


Subject: Last chance to register for Flink Forward SF (April 10). Get 25% discount

2018-03-29 Thread Stephan Ewen
Hi all!

There are still some spots left to attend Flink Forward San Francisco, so
sign up soon before registration closes.

Use this promo code to get 25% off: MailingListFFSF

The 1-day conference takes place on Tuesday, April 10 in downtown SF. We
have a great lineup of speakers from companies working with Flink,
including Alibaba, American Express, Capital One, Comcast, eBay, Google,
Lyft, MediaMath, Netflix, Uber, Walmart Labs, and many others. See the full
program of sessions and speakers:
https://sf-2018.flink-forward.org/conference/

Also on Monday, April 9 we'll be holding training sessions for Flink
(Standard and Advance classes) - it's good opportunity to learn from some
Apache Flink experts.

Hope to see you there!

Best,
Stephan


Re: Unable to load AWS credentials: Flink 1.2.1 + S3 + Kubernetes

2018-03-29 Thread Stephan Ewen
Using AWS credentials with Kubernetes are not trivial. Have you looked at
AWS / Kubernetes docs and projects like https://github.com/jtblin/kube2iam
which bridge between containers and AWS credentials?

Also, Flink 1.2.1 is quite old, you may want to try a newer version. 1.4.x
has a bit of an overhaul of the filesystems.



On Wed, Mar 28, 2018 at 9:41 AM, dyana.rose 
wrote:

> Hiya,
>
> This sounds like it may be similar to the issue I had when running on ECS.
> Take a look at my ticket for how I got around this, and see if it's any
> help: https://issues.apache.org/jira/browse/FLINK-8439
>
> Dyana
>
> On 2018/03/28 02:15:06, "Bajaj, Abhinav"  wrote:
> > Hi,
> >
> > I am trying to use Flink 1.2.1 with RockDB as statebackend and S3 for
> checkpoints.
> > I am using Flink 1.2.1 docker images and running them in Kubernetes
> cluster.
> >
> > I have followed the steps documented in the Flink documentation -
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/aws.html#s3-simple-storage-service
> >
> > I am using AWS IAM roles to setup access for S3.
> > The role has actions "s3:GetObject","s3:ListBucket", "s3:PutObject",
> "s3:DeleteObject" on the bucket.
> >
> > When I run a job, the jobmanager logs below exception –
> >
> > java.io.IOException: The given file URI (s3://$MY_TEST_BUCKET/checkpoints)
> points to the HDFS NameNode at $MY_TEST_BUCKET, but the File System could
> not be initialized with that address: Unable to load AWS credentials from
> any provider in the chain
> >   at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.
> initialize(HadoopFileSystem.java:334)
> >   at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:265)
> >   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:304)
> >   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> >   at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:105)
> >   at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:172)
> >   at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> createStreamFactory(RocksDBStateBackend.java:219)
> >   at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createCheckpointStreamFactory(StreamTask.java:803)
> >   at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:220)
> >   at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:655)
> >   at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:643)
> >   at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:246)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> >   at java.lang.Thread.run(Thread.java:748)
> > Caused by: com.amazonaws.AmazonClientException: Unable to load AWS
> credentials from any provider in the chain
> >   at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(
> AWSCredentialsProviderChain.java:117)
> >   at com.amazonaws.services.s3.AmazonS3Client.invoke(
> AmazonS3Client.java:3521)
> >   at com.amazonaws.services.s3.AmazonS3Client.headBucket(
> AmazonS3Client.java:1031)
> >   at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(
> AmazonS3Client.java:994)
> >   at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(
> S3AFileSystem.java:297)
> >   at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.
> initialize(HadoopFileSystem.java:320)
> >   ... 13 more
> >
> > I checked if the jobmanager pod in the K8s cluster has the correct IAM
> role applied.
> > “curl http://169.254.169.254/latest/meta-data/iam/security-credentials/”
> returned the correct role.
> >
> > After this, I installed aws cli on the jobmanager pod and could
> download/upload to $MY_TEST_BUCKET.
> > This confirmed that the jobmanager pod has the correct IAM role
> associated with it.
> >
> > So, I am not sure why the AWS library in Flink is not able to load the
> credentials.
> > Any thoughts or suggestions to fix or troubleshoot?
> >
> > Appreciate the help.
> >
> > Regards,
> > Abhinav Bajaj
> >
> >
> > [cid:image001.png@01D3C5FF.E9E41E50]
> >
> > Abhinav Bajaj
> > Lead Engineer
> > Open Location Platform
> > Mobile: +1 708 329 9516
> >
> > HERE Seattle
> > 701 Pike Street, suite 2000
> > Seattle, WA 98101 USA
> > 47° 36' 41" N 122° 19' 57" W
> >
> > [cid:image002.png@01D3C5FF.E9E41E50]
> [cid:image003.png@01D3C5FF.E9E41E50] 
> [cid:image004.png@01D3C5FF.E9E41E50] 
>  [cid:image005.png@01D3C5FF.E9E41E50]  company/heremaps> [cid:image006.png@01D3C5FF.E9E41E50] <
> https://www.instagram.com/here/>
> >
> >
> >
>


Re: Incremental checkpointing performance

2018-03-29 Thread Stephan Ewen
I think what happens is the following:

  - For full checkpoints, Flink iterates asynchronously over the data. That
means the whole checkpoint is a compact asynchronous operation.

  - For incremental checkpoints, RocksDB has to flush the write buffer and
create a new SSTable. That flush is synchronous, but should be very brief.
Then there is an asynchronous materialization of the SSTables that are
different from the previous checkpoint.

Because of that, you see that
  - Full checkpoints have a shorter synchronous duration than incremental
checkpoints
  - For small state, full checkpoints may actually be faster end-to-end
  - For large state, the asynchronous part of incremental checkpoints
should be faster, and with that, the end-to-end duration as well

Stephan

On Fri, Mar 23, 2018 at 5:25 PM, Nico Kruber  wrote:

> Hi Miyuru,
> regarding "state.backend", I was looking at version 1.5 docs and some
> things changed compared to 1.3. The "Asynchronous RocksDB snapshot ..."
> messages only occur with full snapshots, i.e. non-incremental, and I
> verified this for your program as well.
>
> There are some issues with your project though:
> 1) your Flink dependencies should all have the same version
> 2) your source does not acquire the checkpoint lock before emitting
> events (see the docs around the SourceFunction you are implementing)
>
>
> Regarding the checkpoint sizes: you can rely on the web interface
> reporting correct metrics. However, the "average" values may not be too
> much useful for you since you are using a sliding count window and thus
> during ramp-up (until you get your 1 windows of the slide size) you
> will have smaller states than after than. Since you only have 2 keys,
> you will eventually have 2 window states to store and from then on
> stay with this number. So rather look at the "History" column of the web
> interface or into the JobManager log.
>
>
> Regarding the original issue: I was recently made aware of another thing
> which may influence the speed of an incremental snapshot: if done
> incrementally, we need to close and flush RocksDB's sst file so that it
> continues with a new file and we can hardlink and copy a consistent
> snapshot. For full snapshots, we simple iterate over all items to copy.
> Now this close-and-flush may be more costly (hence the higher duration)
> and since this cannot be done asynchronously (as a full snapshot) we
> also may not process as many records.
> -> Therefore, you probably did not run your program long enough to
> create the full set of windows and I'm guessing, you will eventually get
> to the same checkpoint sizes.
>
>
> TLDR; incremental snapshots are worth only (and are designed for...) if
> you have a lot of operator state (not just a few MB!) while only few
> parts are actually changing between checkpoints. In these scenarios, the
> added latency for transferring such a snapshot to the checkpoint store
> over network would cover the additional cost during snapshot creation.
>
>
> Nico
>
>
> On 21/03/18 06:01, Miyuru Dayarathna wrote:
> > Hi,
> >
> > Since we could not observe log messages such as "Asynchronous RocksDB
> > snapshot" in the Flink's log files, we ran the application with Flink
> > 1.3.3 as well. But it also did not print the log message. Hence I am
> > wondering whether we ran Flink's incremental checkpointing in the
> > correct manner. I have attached the complete application with this
> > email. Could you please run this in your setup and let me know whether
> > you get the incremental checkpoint related logs printed in your Flink
> setup?
> >
> > Thanks,
> > Miyuru
> >
> >
> >
> >
> > On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna
> >  wrote:
> >
> >
> > Hi Nico,
> >
> > Thanks for the detailed explanation. The only change I have made in my
> > flink-conf.yaml file is the following.
> >
> > state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
> >
> > The default "state.backend" value is set to filesystem. Removing the
> > env.setStateBackend() method code or changing the "state.backend"
> > property to rocksdb does not change the state backend to RocksDB. I got
> > this verified by looking at the Flink log files. I have mentioned a
> > sample of the log file for your reference.
> >
> > ---
> > carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3.
> > Current number of registered hosts is 1. Current number of alive task
> > slots is 1.
> > 2018-03-19 23:10:11,606 INFO
> > org.apache.flink.runtime.client.JobClient - Checking
> > and uploading JAR files
> > 2018-03-19 23:10:11,618 INFO
> > org.apache.flink.runtime.jobmanager.JobManager-
> > Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> > 2018-03-19 23:10:11,623 INFO
> > org.apache.flink.runtime.jobmanager.JobManager- Using
> > restart 

Re: cancel-with-savepoint: 404 Not Found

2018-03-29 Thread Gary Yao
Hi Juho,

Sorry, I should have included an example. To cancel the job:

  curl -XPOST host:port/jobs/:jobid/savepoints -d '{"cancel-job": true}'

Let me know if it works for you.

Best,
Gary

On Thu, Mar 29, 2018 at 10:39 AM, Juho Autio  wrote:

> Thanks Gary. And what if I want to match the old behaviour ie. have the
> job cancelled after savepoint has been created? Maybe I saw some optional
> field for that purpose, that could be put into JSON payload of POST.. But
> this documentation doesn't cover it:
> https://github.com/apache/flink/blob/release-1.5/flink-runti
> me/src/main/java/org/apache/flink/runtime/rest/handler/
> job/savepoints/SavepointHandlers.java#L59
>
>
> On Thu, Mar 29, 2018 at 11:25 AM, Gary Yao  wrote:
>
>> Hi Juho,
>>
>> Thank you for testing the Apache Flink 1.5 release.
>>
>> For FLIP-6 [1], the "cancel with savepoint" API was reworked.
>> Unfortunately the
>> FLIP-6 REST API documentation still needs to be re-generated [2][3].
>> Under the
>> new API, you first issue a POST request against /jobs/:jobid/savepoints,
>> and
>> then poll /jobs/:jobid/savepoints/:savepointtriggerid with HTTP GET. See
>> [4] for
>> more details.
>>
>> Best,
>> Gary
>>
>> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?
>> pageId=65147077
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> monitoring/rest_api.html#flip-6
>> [3] https://issues.apache.org/jira/browse/FLINK-9104
>> [4] https://github.com/apache/flink/blob/release-1.5/flink-runti
>> me/src/main/java/org/apache/flink/runtime/rest/handler/
>> job/savepoints/SavepointHandlers.java#L59
>>
>> On Thu, Mar 29, 2018 at 10:04 AM, Juho Autio 
>> wrote:
>>
>>> With a fresh build from release-1.5 branch, calling
>>> /cancel-with-savepoint fails with 404 Not Found.
>>>
>>> The snapshot docs still mention /cancel-with-savepoint:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/monit
>>> oring/rest_api.html#cancel-job-with-savepoint
>>>
>>> 1. How can I achieve the same result as with GET /cancel-with-savepoint
>>> before?
>>> 2. Are the docs going to be updated?
>>>
>>> It seems like it might be related to this:
>>> https://issues.apache.org/jira/browse/FLINK-8317
>>>
>>> Thanks..
>>>
>>
>>
>


Re: cancel-with-savepoint: 404 Not Found

2018-03-29 Thread Juho Autio
Thanks Gary. And what if I want to match the old behaviour ie. have the job
cancelled after savepoint has been created? Maybe I saw some optional field
for that purpose, that could be put into JSON payload of POST.. But this
documentation doesn't cover it:
https://github.com/apache/flink/blob/release-1.5/flink-
runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/
SavepointHandlers.java#L59

On Thu, Mar 29, 2018 at 11:25 AM, Gary Yao  wrote:

> Hi Juho,
>
> Thank you for testing the Apache Flink 1.5 release.
>
> For FLIP-6 [1], the "cancel with savepoint" API was reworked.
> Unfortunately the
> FLIP-6 REST API documentation still needs to be re-generated [2][3]. Under
> the
> new API, you first issue a POST request against /jobs/:jobid/savepoints,
> and
> then poll /jobs/:jobid/savepoints/:savepointtriggerid with HTTP GET. See
> [4] for
> more details.
>
> Best,
> Gary
>
> [1] https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=65147077
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html#flip-6
> [3] https://issues.apache.org/jira/browse/FLINK-9104
> [4] https://github.com/apache/flink/blob/release-1.5/flink-
> runtime/src/main/java/org/apache/flink/runtime/rest/
> handler/job/savepoints/SavepointHandlers.java#L59
>
> On Thu, Mar 29, 2018 at 10:04 AM, Juho Autio  wrote:
>
>> With a fresh build from release-1.5 branch, calling
>> /cancel-with-savepoint fails with 404 Not Found.
>>
>> The snapshot docs still mention /cancel-with-savepoint:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/
>> monitoring/rest_api.html#cancel-job-with-savepoint
>>
>> 1. How can I achieve the same result as with GET /cancel-with-savepoint
>> before?
>> 2. Are the docs going to be updated?
>>
>> It seems like it might be related to this:
>> https://issues.apache.org/jira/browse/FLINK-8317
>>
>> Thanks..
>>
>
>


Re: cancel-with-savepoint: 404 Not Found

2018-03-29 Thread Gary Yao
Hi Juho,

Thank you for testing the Apache Flink 1.5 release.

For FLIP-6 [1], the "cancel with savepoint" API was reworked. Unfortunately
the
FLIP-6 REST API documentation still needs to be re-generated [2][3]. Under
the
new API, you first issue a POST request against /jobs/:jobid/savepoints, and
then poll /jobs/:jobid/savepoints/:savepointtriggerid with HTTP GET. See
[4] for
more details.

Best,
Gary

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#flip-6
[3] https://issues.apache.org/jira/browse/FLINK-9104
[4]
https://github.com/apache/flink/blob/release-1.5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L59

On Thu, Mar 29, 2018 at 10:04 AM, Juho Autio  wrote:

> With a fresh build from release-1.5 branch, calling /cancel-with-savepoint
> fails with 404 Not Found.
>
> The snapshot docs still mention /cancel-with-savepoint:
>
> https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html#cancel-job-with-savepoint
>
> 1. How can I achieve the same result as with GET /cancel-with-savepoint
> before?
> 2. Are the docs going to be updated?
>
> It seems like it might be related to this:
> https://issues.apache.org/jira/browse/FLINK-8317
>
> Thanks..
>


Re: Out off memory when catching up

2018-03-29 Thread Lasse Nedergaard
Hi

For sure I can share more info. We run on Flink 1.4.2 ( but have the same
problems on 1.3.2 ) on a Aws EMR cluster. 6 taskmanagers on each m4.xlarge
slave. Taskmanager heab set to 1850. We use RockStateDbBackend. we have set
akka.ask.timeout to 60 s if GC should prevent heatbeat,
yarn.maximum-failed-containers to 1 to have some buffer before we loos
our yarn session.
One of our jobs reads data from Kinesis as a Json string and map it into a
object. Then we do some enrichment over a coPtocessFunction. If we can't
find the data in the coprocess stream stream, we make a lookup through a
asyncDataStream. Then we merge the 2 stream so that we now have one stream
where enrichment has taken place. We then parse the binary data and create
new object and output one main stream and 4 sideoutput streams. There
should be 1 to 1 in number of objects in this map function.
For some of the sideout streams we do additional enrichment before all 5
streams are stored in kinesis.
I have now implemented max number of records read from kinesis, and by
doing that I can avoid loosing my task manager, but now I can't catch up as
fast as I would like. I have only seen back pressure once and that was for
another job that use iteration and it never returned from that state.

So yes we create objects. I guess we create around 10-20 objects for each
input objects and I would like to understand what going on, so I can make
an implementation that takes care of it.
But is there a way to configure Flink so it will spill to disk instead of
OOM. I would prefer a slow system instead of a dead system

Please let me know if you need additional information or it don't make any
sense.

Lasse Nedergaard


2018-03-26 12:29 GMT+02:00 Timo Walther :

> Hi Lasse,
>
> in order to avoid OOM exception you should analyze your Flink job
> implementation. Are you creating a lot of objects within your Flink
> functions? Which state backend are you using? Maybe you can tell us a
> little bit more about your pipeline?
>
> Usually, there should be enough memory for the network buffers and state.
> Once the processing is not fast enough and the network buffers are filled
> up the input is limited anyway which results in back-pressure.
>
> Regards,
> Timo
>
>
> Am 21.03.18 um 21:21 schrieb Lasse Nedergaard:
>
> Hi.
>>
>> When our jobs are catching up they read with a factor 10-20 times normal
>> rate but then we loose our task managers with OOM. We could increase the
>> memory allocation but is there a way to figure out how high rate we can
>> consume with the current memory and slot allocation and a way to limit the
>> input to avoid OOM
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>
>
>


cancel-with-savepoint: 404 Not Found

2018-03-29 Thread Juho Autio
With a fresh build from release-1.5 branch, calling /cancel-with-savepoint
fails with 404 Not Found.

The snapshot docs still mention /cancel-with-savepoint:

https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#cancel-job-with-savepoint

1. How can I achieve the same result as with GET /cancel-with-savepoint
before?
2. Are the docs going to be updated?

It seems like it might be related to this:
https://issues.apache.org/jira/browse/FLINK-8317

Thanks..


All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Juho Autio
I built a new Flink distribution from release-1.5 branch yesterday.

The first time I tried to run a job with it ended up in some stalled state
so that the job didn't manage to (re)start but what makes it worse is that
it didn't exit as failed either.

Next time I tried running the same job (but new EMR cluster & all from
scratch) it just worked normally.

On the problematic run, The YARN job was started and Flink UI was being
served, but Flink UI kept showing status CREATED for all sub-tasks and
nothing seemed to be happening.

I found this in Job manager log first (could be unrelated) :

2018-03-28 15:26:17,449 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched from state
RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 20, slots allocated: 8
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$36(ExecutionGraph.java:984)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:551)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:789)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)


After this there was:

2018-03-28 15:26:17,521 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting
the job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93).


And some time after that:

2018-03-28 15:27:39,125 ERROR
org.apache.flink.runtime.blob.BlobServerConnection- GET
operation failed
java.io.EOFException: Premature end of GET request
at
org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:275)
at
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)


Task manager logs didn't have any errors.


Is that error about BlobServerConnection severe enough to make the job get
stuck like this? Seems like a Flink bug, at least that it just gets stuck
and doesn't either restart or make the YARN app exit as failed?



My launch command is basically:

flink-${FLINK_VERSION}/bin/flink run -m yarn-cluster -yn ${NODE_COUNT} -ys
${SLOT_COUNT} -yjm ${JOB_MANAGER_MEMORY} -ytm ${TASK_MANAGER_MEMORY} -yst
-yD restart-strategy=fixed-delay -yD
restart-strategy.fixed-delay.attempts=3 -yD
"restart-strategy.fixed-delay.delay=30 s" -p ${PARALLELISM} $@


I'm also setting this to fix some classloading error (with the previous
build that still works)
-yD.classloader.resolve-order=parent-first


Cluster was AWS EMR, release 5.12.0.

Thanks.


Re: NoClassDefFoundError for jersey-core on YARN

2018-03-29 Thread Juho Autio
Never mind, I'll post this new problem as a new thread.

On Wed, Mar 28, 2018 at 6:35 PM, Juho Autio  wrote:

> Thank you. The YARN job was started now, but the Flink job itself is in
> some bad state.
>
> Flink UI keeps showing status CREATED for all sub-tasks and nothing seems
> to be happening.
>
> ( For the record, this is what I did: export HADOOP_CLASSPATH=`hadoop
> classpath` – as found at https://ci.apache.org/proje
> cts/flink/flink-docs-master/ops/deployment/hadoop.html )
>
> I found this in Job manager log:
>
> 2018-03-28 15:26:17,449 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
> from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 20, slots allocated: 8
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
> a$scheduleEager$36(ExecutionGraph.java:984)
> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
> letableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.
> tryFire(CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(Completa
> bleFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally
> (CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
> ctFuture.handleCompletedFuture(FutureUtils.java:551)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
> etableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.
> tryFire(CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(Completa
> bleFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally
> (CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
> (FutureUtils.java:789)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutio
> nContext.execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
> Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
> mise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
> ort.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedE
> xecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(
> BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(
> Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
> executeTask(LightArrayRevolverScheduler.scala:329)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.
> executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(Ligh
> tArrayRevolverScheduler.scala:284)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArra
> yRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
>
> After this there was:
>
> 2018-03-28 15:26:17,521 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Restarting the job UniqueIdStream (43ed4ace55974d3c486452a45ee5d
> b93).
>
> And some time after that:
>
> 2018-03-28 15:27:39,125 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection
>   - GET operation failed
> java.io.EOFException: Premature end of GET request
> at org.apache.flink.runtime.blob.BlobServerConnection.get(BlobS
> erverConnection.java:275)
> at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobS
> erverConnection.java:117)
>
> Task manager logs don't have any errors.
>
> Is that error about BlobServerConnection severe enough to make the job get
> stuck like this? How to debug this further?
>
> Thanks!
>
> On Wed, Mar 28, 2018 at 5:56 PM, Gary Yao  wrote:
>
>> Hi Juho,
>>
>> Can you try submitting with HADOOP_CLASSPATH=`hadoop classpath` set? [1]
>> For example:
>>   HADOOP_CLASSPATH=`hadoop classpath` link-${FLINK_VERSION}/bin/flink
>> run [...]
>>
>> Best,
>> Gary
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/d
>> eployment/hadoop.html#configuring-flink-with-hadoop-classpaths
>>
>>
>> On Wed, Mar 28, 2018 at 4:26 PM, Juho Autio  wrote:
>>
>>> I built a new Flink distribution from release-1.5 branch today.
>>>
>>> I tried running a job but get this error:
>>> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/Featu
>>> resAndProperties
>>>
>>> I use yarn-cluster mode.
>>>
>>> The