Reset Kafka Consumer using Flink Consumer 10 API

2017-08-22 Thread sohimankotia
Hi,

I am trying to replay kafka logs from specific offset . But I am not able to
make it work .


Using Ref :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

My Code : 



import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.*;


public class ReplayTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

final FlinkKafkaConsumer010 kafkaSource = 
getKafkaSource();
final DataStreamSource in = env.addSource(kafkaSource);

in.addSink(new PrintSinkFunction<>());
in.addSink(getKafkaSink());

env.execute();


}

private static FlinkKafkaConsumer010 getKafkaSource() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:8081");
properties.setProperty("group.id", "test11");
final FlinkKafkaConsumer010 consumer = new
FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
HashMap specificStartOffsets = new 
HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("test", 0), 
2L);
consumer.restoreState(specificStartOffsets);
return consumer;
}


private static FlinkKafkaProducer010 getKafkaSink() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
return new FlinkKafkaProducer010<>("test2", new 
SimpleStringSchema(),
properties);
}


}


I am using 1.2.1 for all flink dependencies .

When I am running code on IDE or local flink set up , I am getting 


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: java.lang.IllegalStateException: The runtime context has not been
initialized.
at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388)
at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50)
at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.main(ReplayTest.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
... 13 more


Thanks and Regards
Sohanvir























--
View this message in context: 

Question about windowing

2017-08-22 Thread Jerry Peng
Hello,

I have a question regarding windowing and triggering.  I am trying to
connect the dots between the simple windowing api e.g.

stream.countWindow(1000, 100)

to the underlying representation using triggers and evictors api:

stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))


how is the above equivalent to the semantics of a window of window
length to be 1000 tuples and the sliding interval to be 100 tuples?

And for time duration windows:

stream.timeWindow(Time.seconds(5), Time.seconds(1))

which maps to:

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

why isn't it mapped to something like:

stream.window(SlidingProcessingTimeWindows.create())
  .trigger(ProccessingTimeTrigger.of(1))
  .evictor(TimeEvictor.of(5))

?

Thanks for any help in advance!

Best,

Jerry


Job submission timeout

2017-08-22 Thread Vishnu Viswanath
Hi,

After I submit the job the client timeout after 10 seconds( Guess Job
manager is taking long time to build the graph, it is a pretty big
JobGraph).

*Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager
did not respond within 1 milliseconds*
* at
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:441)*
* at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonRequest(JarRunHandler.java:69)*
* ... 34 more*
*Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]*
* at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)*
* at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)*
* at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)*
* at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)*
* at scala.concurrent.Await$.result(package.scala:190)*
* at scala.concurrent.Await.result(package.scala)*
* at
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)*
* ... 35 more*

I tried increase akka.ask.timeout to 1 min in flink-conf.yaml but didn't
help. Can someone point me to the correct property?

Thanks,
Vishnu


Re: Flink HA with Kubernetes, without Zookeeper

2017-08-22 Thread Hao Sun
Great suggestions, the etcd operator is very interesting, thanks James.

On Tue, Aug 22, 2017, 12:42 James Bucher  wrote:

> Just wanted to throw in a couple more details here from what I have
> learned from working with Kubernetes.
>
> *All processes restart (a lost JobManager restarts eventually). Should be
> given in Kubernetes*:
>
>- This works very well, we run multiple jobs with a single Jobmanager
>and Flink/Kubernetes recovers quite well.
>
> *A way for TaskManagers to discover the restarted JobManager. Should work
> via Kubernetes as well (restarted containers retain the external hostname)*
> :
>
>- We use StatefulSets which provide a DNS based discovery mechanism.
>Provided DNS is set up correctly with TTLs this works well. You could also
>leverage the built-in Kubernetes services if you are only running a single
>Job Manager. Kubernetes will just route the traffic to the single pod. This
>works fine with a single Job Manager (I have tested it). However multiple
>Job Managers won’t work because Kubernetes will route this round-robin to
>the Job Managers
>
> *A way to isolate different "leader sessions" against each other. Flink
> currently uses ZooKeeper to also attach a "leader session ID" to leader
> election, which is a fencing token to avoid that processes talk to each
> other despite having different views on who is the leader, or whether the
> leaser lost and re-gained leadership:*
>
>- This is probably the most difficult thing. You could leverage the
>built in ETCD cluster. Connecting directly to the Kubernetes ETCD database
>directly is probably a bad idea however. You should be able to create a
>counter using the PATCH API that Kubernetes supplies in the API which
>follows: https://tools.ietf.org/html/rfc6902
> you could probably
>leverage https://tools.ietf.org/html/rfc6902#section-4.6
> to allow for atomic
>updates to counters. Combining this with:
>
> https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
>
> 
>  should give a good
>way to work with ETCD without actually connecting directly to the
>Kubernetes ETCD directly. This integration would require modifying the Job
>Manager leader election code.
>
> *A distributed atomic counter for the checkpoint ID. This is crucial to
> ensure correctness of checkpoints in the presence of JobManager failures
> and re-elections or split-brain situations*.
>
>- This is very similar to the above, we should be able to accomplish
>that through the PATCH API combined with update if condition.
>
> If you don’t want to actually rip way into the code for the Job Manager
> the ETCD Operator  would
> be a good way to bring up an ETCD cluster that is separate from the core
> Kubernetes ETCD database. Combined with zetcd you could probably have that
> up and running quickly.
>
> Thanks,
> James Bucher
>
> From: Hao Sun 
> Date: Monday, August 21, 2017 at 9:45 AM
> To: Stephan Ewen , Shannon Carey 
> Cc: "user@flink.apache.org" 
> Subject: Re: Flink HA with Kubernetes, without Zookeeper
>
> Thanks Shannon for the https://github.com/coreos/zetcd
>  tips, I will check
> that out and share my results if we proceed on that path.
> Thanks Stephan for the details, this is very useful, I was about to ask
> what exactly is stored into zookeeper, haha.
>
> On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen  wrote:
>
>> Hi!
>>
>> That is a very interesting proposition. In cases where you have a single
>> master only, you may bet away with quite good guarantees without ZK. In
>> fact, Flink does not store significant data in ZK at all, it only uses
>> locks and counters.
>>
>> You can have a setup without ZK, provided you have the following:
>>
>>   - All processes restart (a lost JobManager restarts eventually). Should
>> be given in Kubernetes.
>>
>>   - A way for TaskManagers to discover the restarted JobManager. Should
>> work via Kubernetes as well (restarted containers retain the external
>> hostname)
>>
>>   - A way to isolate different "leader sessions" against each other.
>> Flink currently uses ZooKeeper to also attach a "leader session ID" to
>> leader election, which is a fencing token to avoid that processes talk to
>> each other despite having different views on who is the leader, or whether
>> the leaser lost and re-gained leadership.
>>
>>   - An atomic marker for what is the latest completed checkpoint.
>>
>>   - A distributed atomic counter for the checkpoint ID. This is crucial
>> to ensure correctness of checkpoints in the presence of JobManager failures
>> and 

Re: Flink HA with Kubernetes, without Zookeeper

2017-08-22 Thread James Bucher
Just wanted to throw in a couple more details here from what I have learned 
from working with Kubernetes.

All processes restart (a lost JobManager restarts eventually). Should be given 
in Kubernetes:

  *   This works very well, we run multiple jobs with a single Jobmanager and 
Flink/Kubernetes recovers quite well.

A way for TaskManagers to discover the restarted JobManager. Should work via 
Kubernetes as well (restarted containers retain the external hostname):

  *   We use StatefulSets which provide a DNS based discovery mechanism. 
Provided DNS is set up correctly with TTLs this works well. You could also 
leverage the built-in Kubernetes services if you are only running a single Job 
Manager. Kubernetes will just route the traffic to the single pod. This works 
fine with a single Job Manager (I have tested it). However multiple Job 
Managers won’t work because Kubernetes will route this round-robin to the Job 
Managers

A way to isolate different "leader sessions" against each other. Flink 
currently uses ZooKeeper to also attach a "leader session ID" to leader 
election, which is a fencing token to avoid that processes talk to each other 
despite having different views on who is the leader, or whether the leaser lost 
and re-gained leadership:

  *   This is probably the most difficult thing. You could leverage the built 
in ETCD cluster. Connecting directly to the Kubernetes ETCD database directly 
is probably a bad idea however. You should be able to create a counter using 
the PATCH API that Kubernetes supplies in the API which follows: 
https://tools.ietf.org/html/rfc6902 you could probably leverage 
https://tools.ietf.org/html/rfc6902#section-4.6 to allow for atomic updates to 
counters. Combining this with: 
https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
 should give a good way to work with ETCD without actually connecting directly 
to the Kubernetes ETCD directly. This integration would require modifying the 
Job Manager leader election code.

A distributed atomic counter for the checkpoint ID. This is crucial to ensure 
correctness of checkpoints in the presence of JobManager failures and 
re-elections or split-brain situations.

  *   This is very similar to the above, we should be able to accomplish that 
through the PATCH API combined with update if condition.

If you don’t want to actually rip way into the code for the Job Manager the 
ETCD Operator would be a good way to 
bring up an ETCD cluster that is separate from the core Kubernetes ETCD 
database. Combined with zetcd you could probably have that up and running 
quickly.

Thanks,
James Bucher

From: Hao Sun >
Date: Monday, August 21, 2017 at 9:45 AM
To: Stephan Ewen >, Shannon Carey 
>
Cc: "user@flink.apache.org" 
>
Subject: Re: Flink HA with Kubernetes, without Zookeeper

Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that 
out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what 
exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen 
> wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master 
only, you may bet away with quite good guarantees without ZK. In fact, Flink 
does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be 
given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work 
via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink 
currently uses ZooKeeper to also attach a "leader session ID" to leader 
election, which is a fencing token to avoid that processes talk to each other 
despite having different views on who is the leader, or whether the leaser lost 
and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to 
ensure correctness of checkpoints in the presence of JobManager failures and 
re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to 
integrate it would probably be to add an implementation of Flink's 
"HighAvailabilityServices" based on etcd.

Have a look at this class: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

If you want to contribute 

Re: Exception when trying to run flink twitter example

2017-08-22 Thread Krishnanand Khambadkone
Till,  Thank you for the prompt response.  Yes, including the build.properties 
(version = 2.2.0) made the exception go away.  Now no exception but no tweets 
output either.  The program just sits there doing nothing.  I have not 
specified an output directory so the tweets are sent to stdout.   

On Tuesday, August 22, 2017, 9:27:07 AM PDT, Till Rohrmann 
 wrote:

Hi Krishnanand,
could you check that you have the build.properties file in you fat jar 
containing the field version=?
Cheers,Till
On Tue, Aug 22, 2017 at 6:19 PM, Krishnanand Khambadkone 
 wrote:

Hi,  I have created a fat jar with my twitterexample classes and am running it 
like this,

 ~/flink-1.3.2/build-target/ bin/flink run -c TwitterExample ./flinktwitter.jar 
 --twitter-source.consumerKey  --twitter-source. consumerSecret  
--twitter-source.token  --twitter-source.tokenSecret 
I am providing all the right twitter credentials.  I am however seeing the 
following exception.  Has anyone else seen this before.


08/22/2017 08:52:40 Job execution switched to status RUNNING.

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to 
SCHEDULED 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
SCHEDULED 

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to 
DEPLOYING 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
DEPLOYING 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to RUNNING 

08/22/2017 08:52:41 Source: Custom Source -> Flat Map(1/1) switched to FAILED 

java.lang. ExceptionInInitializerError

 at org.apache.flink.streaming. connectors.twitter. TwitterSource.run( 
TwitterSource.java:134)

 at org.apache.flink.streaming. api.operators.StreamSource. 
run(StreamSource.java:87)

 at org.apache.flink.streaming. api.operators.StreamSource. 
run(StreamSource.java:55)

 at org.apache.flink.streaming. runtime.tasks. SourceStreamTask.run( 
SourceStreamTask.java:95)

 at org.apache.flink.streaming. runtime.tasks. StoppableSourceStreamTask.run( 
StoppableSourceStreamTask. java:39)

 at org.apache.flink.streaming. runtime.tasks.StreamTask. 
invoke(StreamTask.java:263)

 at org.apache.flink.runtime. taskmanager.Task.run(Task. java:702)

 at java.lang.Thread.run(Thread. java:745)

Caused by: java.lang.NullPointerException

 at com.twitter.hbc.ClientBuilder. loadVersion(ClientBuilder. java:71)

 at com.twitter.hbc.ClientBuilder. (ClientBuilder.java: 79)

 ... 8 more




08/22/2017 08:52:41 Job execution switched to status FAILING.

java.lang. ExceptionInInitializerError

 at org.apache.flink.streaming. connectors.twitter. TwitterSource.run( 
TwitterSource.java:134)

 at org.apache.flink.streaming. api.operators.StreamSource. 
run(StreamSource.java:87)

 at org.apache.flink.streaming. api.operators.StreamSource. 
run(StreamSource.java:55)

 at org.apache.flink.streaming. runtime.tasks. SourceStreamTask.run( 
SourceStreamTask.java:95)

 at org.apache.flink.streaming. runtime.tasks. StoppableSourceStreamTask.run( 
StoppableSourceStreamTask. java:39)

 at org.apache.flink.streaming. runtime.tasks.StreamTask. 
invoke(StreamTask.java:263)

 at org.apache.flink.runtime. taskmanager.Task.run(Task. java:702)

 at java.lang.Thread.run(Thread. java:745)

Caused by: java.lang.NullPointerException

 at com.twitter.hbc.ClientBuilder. loadVersion(ClientBuilder. java:71)

 at com.twitter.hbc.ClientBuilder. (ClientBuilder.java: 79)

 ... 8 more

08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
CANCELING 

08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
CANCELED 

08/22/2017 08:52:41 Job execution switched to status FAILED.




-- --

 The program finished with the following exception:




org.apache.flink.client. program. ProgramInvocationException: The program 
execution failed: Job execution failed.

 at org.apache.flink.client. program.ClusterClient.run( ClusterClient.java:478)

 at org.apache.flink.client. program. StandaloneClusterClient. submitJob( 
StandaloneClusterClient.java: 105)

 at org.apache.flink.client. program.ClusterClient.run( ClusterClient.java:442)

 at org.apache.flink.streaming. api.environment. StreamContextEnvironment. 
execute( StreamContextEnvironment.java: 73)

 at TwitterExample.main( TwitterExample.java:68)

 at sun.reflect. NativeMethodAccessorImpl. invoke0(Native Method)

 at sun.reflect. NativeMethodAccessorImpl. invoke( 
NativeMethodAccessorImpl.java: 62)

 at sun.reflect. DelegatingMethodAccessorImpl. invoke( 
DelegatingMethodAccessorImpl. java:43)

 at java.lang.reflect.Method. invoke(Method.java:498)

 at org.apache.flink.client. program.PackagedProgram. callMainMethod( 
PackagedProgram.java:528)

 at org.apache.flink.client. program.PackagedProgram. 
invokeInteractiveModeForExecut 

Re: StandaloneResourceManager failed to associate with JobManager leader

2017-08-22 Thread Hao Sun
Thanks Till, the DEBUG log level is a good idea. I figured it out. I made a
mistake with `-` and `_`.

On Tue, Aug 22, 2017 at 1:39 AM Till Rohrmann  wrote:

> Hi Hao Sun,
>
> have you checked that one can resolve the hostname flink_jobmanager from
> within the container? This is required to connect to the JobManager. If
> this is the case, then log files with DEBUG log level would be helpful to
> track down the problem.
>
> Cheers,
> Till
>
> On Wed, Aug 16, 2017 at 5:35 AM, Hao Sun  wrote:
>
>> Hi,
>>
>> I am trying to run a cluster of job-manager and task-manager in docker.
>> One of each for now. I got a StandaloneResourceManager error, stating
>> that it can not associate with job-manager. I do not know what was wrong.
>>
>> I am sure that job-manager can be connected.
>> ===
>> root@flink-jobmanager:/opt/flink# telnet flink_jobmanager 32929
>> Trying 172.18.0.3...
>> Connected to flink-jobmanager.
>> Escape character is '^]'.
>> Connection closed by foreign host.
>> ===
>>
>> Here is my config:
>> ===
>> Starting Job Manager
>> config file:
>> jobmanager.rpc.address: flink_jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.web.port: 8081
>> jobmanager.heap.mb: 1024
>> taskmanager.heap.mb: 1024
>> taskmanager.numberOfTaskSlots: 1
>> taskmanager.memory.preallocate: false
>> parallelism.default: 1
>> jobmanager.archive.fs.dir: file:///flink_data/completed-jobs/
>> historyserver.archive.fs.dir: file:///flink_data/completed-jobs/
>> state.backend: rocksdb
>> state.backend.fs.checkpointdir: file:///flink_data/checkpoints
>> taskmanager.tmp.dirs: /flink_data/tmp
>> blob.storage.directory: /flink_data/tmp
>> jobmanager.web.tmpdir: /flink_data/tmp
>> env.log.dir: /flink_data/logs
>> high-availability: zookeeper
>> high-availability.storageDir: file:///flink_data/ha/
>> high-availability.zookeeper.quorum: kafka:2181
>> blob.server.port: 6124
>> query.server.port: 6125
>> ===
>>
>> Here is the major error I see:
>> ===
>> 2017-08-16 02:46:23,586 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2017-08-16 02:46:23,612 INFO
>> org.apache.flink.runtime.jobmanager.JobManager - JobManager
>> akka.tcp://flink@flink_jobmanager:32929/user/jobmanager was granted
>> leadership with leader session ID
>> Some(06abc8f5-c1b9-44b2-bb7f-771c74981552).
>> 2017-08-16 02:46:23,627 INFO
>> org.apache.flink.runtime.jobmanager.JobManager - Delaying recovery of all
>> jobs by 1 milliseconds.
>> 2017-08-16 02:46:23,638 INFO
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader
>> reachable under akka.tcp://flink@flink_jobmanager
>> :32929/user/jobmanager:06abc8f5-c1b9-44b2-bb7f-771c74981552.
>> 2017-08-16 02:46:23,640 INFO
>> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>> - Trying to associate with JobManager leader
>> akka.tcp://flink@flink_jobmanager:32929/user/jobmanager
>> 2017-08-16 02:46:23,653 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka://flink/deadLetters), Path(/)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at
>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
>> at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:63)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils$.getActorRefFuture(AkkaUtils.scala:498)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils.getActorRefFuture(AkkaUtils.scala)
>> at
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever.notifyLeaderAddress(JobManagerRetriever.java:141)
>> at
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.nodeChanged(ZooKeeperLeaderRetrievalService.java:168)
>> at org.apache.flink.shaded.org
>> 
>> 

Re: Exception when trying to run flink twitter example

2017-08-22 Thread Till Rohrmann
Hi Krishnanand,

could you check that you have the build.properties file in you fat jar
containing the field version=?

Cheers,
Till

On Tue, Aug 22, 2017 at 6:19 PM, Krishnanand Khambadkone <
kkhambadk...@yahoo.com> wrote:

> Hi,  I have created a fat jar with my twitterexample classes and am
> running it like this,
>
>  ~/flink-1.3.2/build-target/bin/flink run -c TwitterExample
> ./flinktwitter.jar  --twitter-source.consumerKey  
> --twitter-source.consumerSecret
>  --twitter-source.token  --twitter-source.tokenSecret
> 
> I am providing all the right twitter credentials.  I am however seeing the
> following exception.  Has anyone else seen this before.
>
>
> 08/22/2017 08:52:40 Job execution switched to status RUNNING.
>
> 08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to
> SCHEDULED
>
> 08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
> SCHEDULED
>
> 08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to
> DEPLOYING
>
> 08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
> DEPLOYING
>
> 08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
> RUNNING
>
> 08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to
> RUNNING
>
> 08/22/2017 08:52:41 Source: Custom Source -> Flat Map(1/1) switched to
> FAILED
>
> java.lang.ExceptionInInitializerError
>
> at org.apache.flink.streaming.connectors.twitter.TwitterSource.run(
> TwitterSource.java:134)
>
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:87)
>
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:55)
>
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:95)
>
> at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(
> StoppableSourceStreamTask.java:39)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.NullPointerException
>
> at com.twitter.hbc.ClientBuilder.loadVersion(ClientBuilder.java:71)
>
> at com.twitter.hbc.ClientBuilder.(ClientBuilder.java:79)
>
> ... 8 more
>
>
> 08/22/2017 08:52:41 Job execution switched to status FAILING.
>
> java.lang.ExceptionInInitializerError
>
> at org.apache.flink.streaming.connectors.twitter.TwitterSource.run(
> TwitterSource.java:134)
>
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:87)
>
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:55)
>
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:95)
>
> at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(
> StoppableSourceStreamTask.java:39)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.NullPointerException
>
> at com.twitter.hbc.ClientBuilder.loadVersion(ClientBuilder.java:71)
>
> at com.twitter.hbc.ClientBuilder.(ClientBuilder.java:79)
>
> ... 8 more
>
> 08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
> CANCELING
>
> 08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
> CANCELED
>
> 08/22/2017 08:52:41 Job execution switched to status FAILED.
>
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:478)
>
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:105)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:442)
>
> at org.apache.flink.streaming.api.environment.StreamContextEnvironment.
> execute(StreamContextEnvironment.java:73)
>
> at TwitterExample.main(TwitterExample.java:68)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:381)
>
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:838)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>
> at 

Re: Expception with Avro Serialization on RocksDBStateBackend

2017-08-22 Thread Till Rohrmann
Hi Biplob,

have you told Avro to allow null for fields in your schema? If yes, then
could you share the Avro schema, the version of Flink as well as the Avro
version with us? This would help with further understanding the problem.

Cheers,
Till

On Tue, Aug 22, 2017 at 5:42 PM, Biplob Biswas 
wrote:

> Hi,
>
> I am getting the following exception in my code, I can observe that there's
> something wrong while serializing my Object, the class of which looks
> something like this:
>
> https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1
>
> The exact cause it seems is some field inside my nested object which is
> null
> (reversalIndicator ), but its not exactly clear why this exception is
> thrown, one interesting thing to note is when I serialized with kryo
> before,
> it serialized properly without any issues. Is it some requirement of the
> avro serializer or some bug ? or Some problem on my end?
>
>
>
> 2017-08-22 17:21:48,892 ERROR
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState- Something
> unexpected happened - probably malformed event
> java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(
> RocksDBValueState.java:102)
> at
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(
> UpdateTxnState.java:98)
> at
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(
> UpdateTxnState.java:1)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.NullPointerException: in
> com.airplus.poc.flink.model.TransactionStateModel in
> com.airplus.poc.generated.xjc.RecordReadEventType in
> com.airplus.poc.generated.xjc.RawTransactionItemType in string null of
> string in field reversalIndicator of
> com.airplus.poc.generated.xjc.RawTransactionItemType in field
> rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in
> field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(
> ReflectDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(
> GenericDatumWriter.java:58)
> at
> org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(
> AvroSerializer.java:135)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(
> RocksDBValueState.java:99)
> ... 8 more
> Caused by: java.lang.NullPointerException
>
>
>
> Thanks & Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Expception-with-
> Avro-Serialization-on-RocksDBStateBackend-tp15067.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Exception when trying to run flink twitter example

2017-08-22 Thread Krishnanand Khambadkone
Hi,  I have created a fat jar with my twitterexample classes and am running it 
like this,

 ~/flink-1.3.2/build-target/bin/flink run -c TwitterExample ./flinktwitter.jar  
--twitter-source.consumerKey  --twitter-source.consumerSecret  
--twitter-source.token  --twitter-source.tokenSecret 
I am providing all the right twitter credentials.  I am however seeing the 
following exception.  Has anyone else seen this before.


08/22/2017 08:52:40 Job execution switched to status RUNNING.

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to 
SCHEDULED 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
SCHEDULED 

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to 
DEPLOYING 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
DEPLOYING 

08/22/2017 08:52:40 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 

08/22/2017 08:52:40 Source: Custom Source -> Flat Map(1/1) switched to RUNNING 

08/22/2017 08:52:41 Source: Custom Source -> Flat Map(1/1) switched to FAILED 

java.lang.ExceptionInInitializerError

 at 
org.apache.flink.streaming.connectors.twitter.TwitterSource.run(TwitterSource.java:134)

 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)

 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)

 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)

 at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NullPointerException

 at com.twitter.hbc.ClientBuilder.loadVersion(ClientBuilder.java:71)

 at com.twitter.hbc.ClientBuilder.(ClientBuilder.java:79)

 ... 8 more




08/22/2017 08:52:41 Job execution switched to status FAILING.

java.lang.ExceptionInInitializerError

 at 
org.apache.flink.streaming.connectors.twitter.TwitterSource.run(TwitterSource.java:134)

 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)

 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)

 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)

 at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NullPointerException

 at com.twitter.hbc.ClientBuilder.loadVersion(ClientBuilder.java:71)

 at com.twitter.hbc.ClientBuilder.(ClientBuilder.java:79)

 ... 8 more

08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
CANCELING 

08/22/2017 08:52:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to 
CANCELED 

08/22/2017 08:52:41 Job execution switched to status FAILED.






 The program finished with the following exception:




org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.

 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)

 at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)

 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)

 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)

 at TwitterExample.main(TwitterExample.java:68)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:498)

 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)

 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)

 at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)

 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)

 at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)

 at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)

 at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)

 at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at 

Expception with Avro Serialization on RocksDBStateBackend

2017-08-22 Thread Biplob Biswas
Hi,

I am getting the following exception in my code, I can observe that there's
something wrong while serializing my Object, the class of which looks
something like this:
 
https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1

The exact cause it seems is some field inside my nested object which is null
(reversalIndicator ), but its not exactly clear why this exception is
thrown, one interesting thing to note is when I serialized with kryo before,
it serialized properly without any issues. Is it some requirement of the
avro serializer or some bug ? or Some problem on my end? 



2017-08-22 17:21:48,892 ERROR
com.airplus.poc.flink.statefulFunctions.UpdateTxnState- Something
unexpected happened - probably malformed event
java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:98)
at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:1)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: in
com.airplus.poc.flink.model.TransactionStateModel in
com.airplus.poc.generated.xjc.RecordReadEventType in
com.airplus.poc.generated.xjc.RawTransactionItemType in string null of
string in field reversalIndicator of
com.airplus.poc.generated.xjc.RawTransactionItemType in field
rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in
field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
... 8 more
Caused by: java.lang.NullPointerException



Thanks & Regards,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expception-with-Avro-Serialization-on-RocksDBStateBackend-tp15067.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: HBase connection problems on Flink 1.3.1

2017-08-22 Thread Till Rohrmann
Thanks for sharing your solution with the community Flavio.

Cheers,
Till

On Tue, Aug 22, 2017 at 2:34 PM, Flavio Pompermaier 
wrote:

> I was able to fix the problem by adding the following line within
> bin/config.sh:
>
> HBASE_CONF_DIR="/etc/hbase/conf"
>
> Indeed, Cloudera 5.9 doesn't set HBASE_CONF_DIR env variable automatically.
> Another possible solution could be to set this parameter manually into
> .bash_profile or .profile (not .bashrc because it's read only when the
> shell is interactive).
> I don't know whether this is the best solution or not but it works...
>
> Best,
> Flavio
>
> On Tue, Aug 22, 2017 at 12:06 PM, Flavio Pompermaier  > wrote:
>
>> Hi to all,
>> I'm trying to connect to HBase on Flink 1.3.1 but it seems that
>> *HBaseConfiguration.create()* doesn't work correctly (because zookeper
>> properties are not read from hbase-site.xml).
>> I've also tried to put the hbase-site.xml in the flink conf folder but it
>> didn't work..
>>
>> What should I do? I didn't have any problem before with Flink 1.2
>>
>> Thanks in advance,
>> Flavio
>>
>
>


Re: HBase connection problems on Flink 1.3.1

2017-08-22 Thread Flavio Pompermaier
I was able to fix the problem by adding the following line within
bin/config.sh:

HBASE_CONF_DIR="/etc/hbase/conf"

Indeed, Cloudera 5.9 doesn't set HBASE_CONF_DIR env variable automatically.
Another possible solution could be to set this parameter manually into
.bash_profile or .profile (not .bashrc because it's read only when the
shell is interactive).
I don't know whether this is the best solution or not but it works...

Best,
Flavio

On Tue, Aug 22, 2017 at 12:06 PM, Flavio Pompermaier 
wrote:

> Hi to all,
> I'm trying to connect to HBase on Flink 1.3.1 but it seems that
> *HBaseConfiguration.create()* doesn't work correctly (because zookeper
> properties are not read from hbase-site.xml).
> I've also tried to put the hbase-site.xml in the flink conf folder but it
> didn't work..
>
> What should I do? I didn't have any problem before with Flink 1.2
>
> Thanks in advance,
> Flavio
>


HBase connection problems on Flink 1.3.1

2017-08-22 Thread Flavio Pompermaier
Hi to all,
I'm trying to connect to HBase on Flink 1.3.1 but it seems that
*HBaseConfiguration.create()* doesn't work correctly (because zookeper
properties are not read from hbase-site.xml).
I've also tried to put the hbase-site.xml in the flink conf folder but it
didn't work..

What should I do? I didn't have any problem before with Flink 1.2

Thanks in advance,
Flavio


Re: Flink doesn't free YARN slots after restarting

2017-08-22 Thread Till Rohrmann
Hi Bowen,

sorry for my late answer. I dug through some of the logs and it seems that
you have the following problem:

   1.

   Once in a while the Kinesis producer fails with a
   UserRecordFailedException saying “Expired while waiting in HttpClient queue
   Record has reached expiration”. This seems to be a problem on the Kinesis
   side. This will trigger the task failure and the cancellation of all other
   tasks as well.
   2.

   Somehow Flink does not manage to cancel all tasks within a period of 180
   seconds. This value is configurable via task.cancellation.timeout (unit
   ms) via the Flink configuration. It looks a bit like you have a lot of
   logging going on, because the the code is waiting for example on
   Category.java:204 and other log4j methods. This could, however also cover
   the true issue. What you could do is to try out a different logging backend
   such as logback [1], for example.
   3.

   The failing cancellation is a fatal error which leads to the termination
   of the TaskManager. This will be notified by the YarnResourceManager and it
   will restart the container. This goes on until it reaches the number of
   maximum failed containers. This value can be configured via
   yarn.maximum-failed-containers. Per default it is the number of initial
   containers you requested. If you set this value to -1, then it will
   never fail and always restart failed containers. Once the maximum is
   reached, Flink terminates the Yarn application.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j

In order to further debug the problem, which version of Flink are you using
and maybe you could provide us with the debug log level logs of the
TaskManagers.

Cheers,
Till
​

On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li  wrote:

> Hi Till,
> Any idea why it happened? I've tried different configurations for
> configuring our Flink cluster, but the cluster always fails after 4 or 5
> hours.
>
> According to the log, looks like the total number of slots becomes 0
> at the end, and YarnClusterClient shuts down application master as a
> result. Why the slots are not released? Or are they actually crushed and
> thus no longer available?
>
> I'm trying to deploy the first Flink cluster within out company. And this
> issue is slowing us down from proving that Flink actually works for us.
> We'd appreciate your help on it!
>
> Thanks,
> Bowen
>
> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li  wrote:
>
>> Hi Till,
>> Thanks for taking this issue.
>>
>> We are not comfortable sending logs to a email list which is this
>> open. I'll send logs to you.
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> if I'm not mistaken, then Flink's current Yarn implementation does not
>>> actively releases containers. The `YarnFlinkResourceManager` is started
>>> with a fixed number of containers it always tries to acquire. If a
>>> container should die, then it will request a new one.
>>>
>>> In case of a failure all slots should be freed and then they should be
>>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>>> that 12 new slots will be used unless the old slots are no longer available
>>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>>> Could you share the logs with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li 
>>> wrote:
>>>
 Hi guys,
 I was running a Flink job (12 parallelism) on an EMR cluster with
 48 YARN slots. When the job starts, I can see from Flink UI that the job
 took 12 slots, and 36 slots were left available.

 I would expect that when the job fails, it would restart from
 checkpointing by taking another 12 slots and freeing the original 12 
 slots. *Well,
 I observed that the job took new slots but never free original slots. The
 Flink job ended up killed by YARN because there's no available slots
 anymore.*

  Here's the command I ran Flink job:

  ```
  flink run -m yarn-cluster -yn 6 -ys 8 -ytm 4  xxx.jar
  ```

  Does anyone know what's going wrong?

 Thanks,
 Bowen

>>>
>>>
>>
>


Re: StandaloneResourceManager failed to associate with JobManager leader

2017-08-22 Thread Till Rohrmann
Hi Hao Sun,

have you checked that one can resolve the hostname flink_jobmanager from
within the container? This is required to connect to the JobManager. If
this is the case, then log files with DEBUG log level would be helpful to
track down the problem.

Cheers,
Till

On Wed, Aug 16, 2017 at 5:35 AM, Hao Sun  wrote:

> Hi,
>
> I am trying to run a cluster of job-manager and task-manager in docker.
> One of each for now. I got a StandaloneResourceManager error, stating that
> it can not associate with job-manager. I do not know what was wrong.
>
> I am sure that job-manager can be connected.
> ===
> root@flink-jobmanager:/opt/flink# telnet flink_jobmanager 32929
> Trying 172.18.0.3...
> Connected to flink-jobmanager.
> Escape character is '^]'.
> Connection closed by foreign host.
> ===
>
> Here is my config:
> ===
> Starting Job Manager
> config file:
> jobmanager.rpc.address: flink_jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.web.port: 8081
> jobmanager.heap.mb: 1024
> taskmanager.heap.mb: 1024
> taskmanager.numberOfTaskSlots: 1
> taskmanager.memory.preallocate: false
> parallelism.default: 1
> jobmanager.archive.fs.dir: file:///flink_data/completed-jobs/
> historyserver.archive.fs.dir: file:///flink_data/completed-jobs/
> state.backend: rocksdb
> state.backend.fs.checkpointdir: file:///flink_data/checkpoints
> taskmanager.tmp.dirs: /flink_data/tmp
> blob.storage.directory: /flink_data/tmp
> jobmanager.web.tmpdir: /flink_data/tmp
> env.log.dir: /flink_data/logs
> high-availability: zookeeper
> high-availability.storageDir: file:///flink_data/ha/
> high-availability.zookeeper.quorum: kafka:2181
> blob.server.port: 6124
> query.server.port: 6125
> ===
>
> Here is the major error I see:
> ===
> 2017-08-16 02:46:23,586 INFO org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalServic
> e.
> 2017-08-16 02:46:23,612 INFO org.apache.flink.runtime.jobmanager.JobManager
> - JobManager akka.tcp://flink@flink_jobmanager:32929/user/jobmanager was
> granted leadership with leader session ID Some(06abc8f5-c1b9-44b2-bb7f-
> 771c74981552).
> 2017-08-16 02:46:23,627 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Delaying recovery of all jobs by 1 milliseconds.
> 2017-08-16 02:46:23,638 INFO 
> org.apache.flink.runtime.webmonitor.JobManagerRetriever
> - New leader reachable under akka.tcp://flink@flink_jobmanager:32929/user/
> jobmanager:06abc8f5-c1b9-44b2-bb7f-771c74981552.
> 2017-08-16 02:46:23,640 INFO org.apache.flink.runtime.
> clusterframework.standalone.StandaloneResourceManager - Trying to
> associate with JobManager leader akka.tcp://flink@flink_
> jobmanager:32929/user/jobmanager
> 2017-08-16 02:46:23,653 WARN 
> org.apache.flink.runtime.webmonitor.JobManagerRetriever
> - Failed to retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka://flink/deadLetters), Path(/)]
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
> ActorSelection.scala:65)
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
> ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
> BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.
> scala:120)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> execute(Future.scala:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
> scala:40)
> at scala.concurrent.impl.Promise$DefaultPromise.scala$
> concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.
> scala:280)
> at scala.concurrent.impl.Promise$DefaultPromise.onComplete(
> Promise.scala:270)
> at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:63)
> at org.apache.flink.runtime.akka.AkkaUtils$.getActorRefFuture(
> AkkaUtils.scala:498)
> at org.apache.flink.runtime.akka.AkkaUtils.getActorRefFuture(
> AkkaUtils.scala)
> at org.apache.flink.runtime.webmonitor.JobManagerRetriever.
> notifyLeaderAddress(JobManagerRetriever.java:141)
> at org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalServic
> e.nodeChanged(ZooKeeperLeaderRetrievalService.java:168)
> at org.apache.flink.shaded.org.apache.curator.framework.
> recipes.cache.NodeCache$4.apply(NodeCache.java:310)
> at org.apache.flink.shaded.org.apache.curator.framework.
> recipes.cache.NodeCache$4.apply(NodeCache.java:304)
> at org.apache.flink.shaded.org.apache.curator.framework.
> listen.ListenerContainer$1.run(ListenerContainer.java:93)
> at org.apache.flink.shaded.org.apache.curator.shaded.com.
> 

Re: akka timeout

2017-08-22 Thread Till Rohrmann
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
the right timeout value from the configuration. Instead it uses a hardcoded
10s timeout. This has only been changed recently and is already committed
in the master. So with the next release 1.4 it will properly pick up the
right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:

> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
> of underline stability issue that I am trying to track down.
>
> It is Flink 1.2.
>
> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
> wrote:
>
>> The MetricFetcher always use the default akka timeout value.
>>
>>
>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>
>> Hi Steven,
>>
>> I thought that the MetricFetcher picks up the right timeout from the
>> configuration. Which version of Flink are you using?
>>
>> The timeout is not a critical problem for the job health.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu  wrote:
>>
>>>
>>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>>> service. two questions
>>> 1) why doesn't metric query use the 60 s value configured in yaml file?
>>> does it always use default 10 s value?
>>> 2) could this cause heartbeat failure between task manager and job
>>> manager? or is this jut non-critical failure that won't affect job health?
>>>
>>> Thanks,
>>> Steven
>>>
>>> 2017-08-17 23:34:33,421 WARN 
>>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
>>> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
>>> e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>> at 
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>
>>
>>
>


Re: Question about parallelism

2017-08-22 Thread Till Rohrmann
Hi Jerry,

you can set the global parallelism via the
ExecutionEnvironment#setParallelism. If you call setParallelism on an
operator, then it only changes the parallelism of this operator. The
parallelism of an operator means how many parallel instances of this
operator will be executed. Thus, it also means into how many partitions
(potentially infinite) your data stream will be partitioned.

If the parallelism changes between two operators, then there is a
re-partitioning of the data in a round-robin fashion across all parallel
subtasks of the succeeding operator.

Cheers,
Till
​

On Fri, Aug 18, 2017 at 11:12 PM, Jerry Peng 
wrote:

> I guess my previous question is also asking if the parallelism is set
> for the operator or "data stream".  Is there implied repartitioning
> when the parallelism changes?
>
> On Fri, Aug 18, 2017 at 2:08 PM, Jerry Peng 
> wrote:
> > Hello all,
> >
> > I have a question about parallelism and partitioning in the
> > DataStreams API.  In Flink, a user can the parallelism of a data
> > source as well as operators.  So when I set the parallelism of a data
> > source e.g.
> >
> > DataStream text =
> > env.readTextFile(params.get("input")).setParallelism(5)
> >
> > does this mean that the resulting "text" DataStream in going to be
> > partitioned into 5 partitions or does it mean that there are going to
> > be 5 parallel tasks that are going to run for this stage?
> >
> > If the next operator is:
> >
> > DataStream> counts = text.flatMap(new
> > Tokenizer()).setParallelism(10)
> >
> > and the parallelism is set to 10.  Are there 10 parallel tasks
> > consuming from the 5 partitions? and how is the resulting "counts"
> > DataStream partitioned? into 10 partitions?
> >
> > Thanks in advance!
> >
> > Best,
> >
> > Jerry
>


Re: Prioritize DataStream

2017-08-22 Thread Till Rohrmann
Hi Elias,

sorry for the slow answer. You were right that the answer is currently no.

However, people are currently working on changing the way the stream
operators work. This will allow the operator to decide from which input to
read next. Having such a functionality will enable us to implement proper
side inputs which will also cover your use case.

At the moment, one thing you could try is to buffer events in operator
state until you have fully consumed the other stream. Upon seeing all
events from the other stream (needs a proper termination event you can
detect) you can then replay all the buffered events. This could however
entail quite a big user state.

Cheers,
Till

On Mon, Aug 21, 2017 at 8:14 PM, Elias Levy 
wrote:

> Flink folks,
>
> A response to the question below?
>
> On Sat, Aug 19, 2017 at 11:02 AM, Elias Levy 
> wrote:
>
>> I believe the answer to this question is "no", but I figure I might as
>> well ask.  Is there a way to prioritize a stream?
>>
>> The use case is prioritization of a control stream.  This is mostly
>> needed on start-up, where a job might start consuming from the data stream
>> before consuming from the control stream.
>>
>>
>>
>


Re: Deleting files in continuous processing

2017-08-22 Thread Till Rohrmann
Hi Mohit,

Flink does not support this behaviour out of the box afaik. I think you
have to write your own source function or extend
ContinuousFileMonitoringFunction in order to do that.

Cheers,
Till
​

On Mon, Aug 21, 2017 at 11:07 PM, Mohit Anchlia 
wrote:

> Just checking to see if there is a way to purge files after it's processed.
>
> On Tue, Aug 15, 2017 at 5:11 PM, Mohit Anchlia 
> wrote:
>
>> Is there a way to delete a file once it has been processed?
>>
>> streamEnv
>>
>> .readFile(format, args[0], FileProcessingMode.*PROCESS_CONTINUOUSLY*,
>> 2000)
>>
>
>


Re: Global State and Scaling

2017-08-22 Thread Till Rohrmann
Hi Elias,

you're right, we currently don't support proper broadcast state. Hope to
add support for this in the near future.

The maximum parallelism only affects the keyed state because it defines how
many key groups there are. The key groups are the smallest unit of state
which can be re-partitioned (e.g. due to scaling up/down).

Cheers,
Till

On Tue, Aug 22, 2017 at 3:02 AM, Elias Levy 
wrote:

> Looks like Gerard asked something along similar lines
> 
> just last month and that there is a JIRA
>  for official support
> for broadcast state.  Looks like the ugly hack is the way to go for now.
>
>
> On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy 
> wrote:
>
>> I am implementing a control stream.  The stream communicates a global
>> configuration value for the whole job.  It uses DataStream.broadcast() to
>> communicate this to all parallel operator instances.  I would like to save
>> this value in state so that it can be recovered when the job
>> restarts/recovers.  The control stream is not keyed, so the only option is
>> Operator state.
>>
>> I could implement this using the ListCheckpointed interface, returning
>> Collections.singletonList(configValue) from snapshotState.  It is clear
>> what I'd need to do in restoreState in the case of scale in.  If I include
>> a serial number in the config, and it receives multiple values on restore,
>> it can keep the config value with the largest serial number, indicating the
>> latest config.
>>
>> Alas, it is not clear what should happen on scale out, as some operator
>> instances will receive empty lists.
>>
>> It seems the other alternative is to use CheckpointedFunction, along with
>> union redistribution via getUnionListState, and then have each operator
>> instance select from the union list the config with the latest serial
>> number, of which they should be multiple copies.  But this seem like an
>> ugly hack.
>>
>>
>> In addition, the documentation is unclear on the relationship and effect,
>> if any, of the maximum parallelism Flink job parameter on operator state,
>> where as it is much clearer on this regard as it related to keyed state via
>> key groups.
>>
>>
>> How are folks handling this use case, i.e. storing and restoring global
>> config values via Flink state?
>>
>>
>


Re: how is data partitoned and distributed for connected stream

2017-08-22 Thread Till Rohrmann
Hi,

if all operators have the same parallelism, then there will be a pointwise
connection. This means all elements arriving at s1_x and s2_x will be
forwarded to s3_x with _x denoting the parallel subtask. Thus, to answer
your second question, the single s1 element will only be present at one
subtask of the CoMap operator, depending from which s1 parallel subtask it
comes.

Cheers,
Till

On Tue, Aug 22, 2017 at 8:31 AM, xie wei  wrote:

> Hello Flink,
>
> assume there are two finite streams, stream1(s1)has only one event,
> stream2(s2)have 100 events, the parallelism is 2.
> Then doing stream1.connect(stream2).map().
> How is the data partitioned and distributed to the CoMap instances? Is the
> event from s1 only available in one of the CoMap instance?
> Thank you!
>
> Best regards
> Wei
>
>
>


how is data partitoned and distributed for connected stream

2017-08-22 Thread xie wei
Hello Flink,

assume there are two finite streams, stream1(s1)has only one event,
stream2(s2)have 100 events, the parallelism is 2.
Then doing stream1.connect(stream2).map().
How is the data partitioned and distributed to the CoMap instances? Is the
event from s1 only available in one of the CoMap instance?
Thank you!

Best regards
Wei