Flink Accumulators vs Metrics
Hi All Based on my code reading, I have following understanding of the Metrics and Accumulators. 1. Accumulators for a Flink JOB work like global counters. They are designed so that accumulator values from different instances of Execution Vertex can be combined. They are essentially distributed counters. 2. Flink Metrics are local to Task Manager which is reporting those, and need external aggregation for a Job centric view I see that one can defined User Metrics as part of writing Flink Programs. But these metrics would not be consolidated when the job is running same task on different task managers. Having said that, Is it fair to classify that Metrics are for surfacing operation details only, and would not be replacing Accumulators anytime. For my use case, I wanted to maintain some Global counters/ histograms. ( like the one available in Storm - e.g. Total Messages Processed in last 1 minute, last 10 minutes etc). Metrics would have been perfect fit for these but one would need to employ external aggregations to come up with holistic view of metrics at JOB level. Please correct my understanding if i am missing something here. Regards Sumit Chawla
Re: Get Flink ExecutionGraph Programmatically
HI Aljoscha I was able to get the ClusterClient and Accumulators using following: DefaultCLI defaultCLI = new DefaultCLI(); CommandLine line = new DefaultParser().parse(new Options(), new String[]{}, true); ClusterClient clusterClient = defaultCLI.retrieveCluster(line,configuration); Regards Sumit Chawla On Thu, Sep 22, 2016 at 4:55 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > there is ClusterClient.getAccumulators(JobID jobID) which should be able > to > get the accumulators for a running job. If you can construct a > ClusterClient that should be a good solution. > > Cheers, > Aljoscha > > On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <sumitkcha...@gmail.com> wrote: > > > Hi Sean > > > > My goal here is to get User Accumulators. I know there exists the REST > > Calls. But since i am running my code in the same JVM, i wanted to avoid > > go over HTTP. I saw this code in JobAccumulatorsHandler and tried to use > > this. Would you suggest some alternative approach to avoid this over the > > network serialization for Akka? > > > > Regards > > Sumit Chawla > > > > > > On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote: > > > > > Between two different actor systems in the same JVM, messages are still > > > serialized (they go through a local socket, I think). > > > > > > Getting the execution graph is not easily possible, and not intended, > as > > it > > > actually contains RPC resources, etc. > > > > > > What do you need from the execution graph? Maybe there is another way > to > > > achieve that... > > > > > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com> > > > wrote: > > > > > > > Hi Chesney > > > > > > > > I am actually running this code in the same JVM as the WebInterface > and > > > > JobManager. I am programmatically, starting the JobManager. and > then > > > > running this code in same JVM to query metrics. Only difference > could > > be > > > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure > > if > > > it > > > > forces it to execute the code as if request is coming over the > wire. I > > > am > > > > not very well aware of Akka internals, so may be somebody can shed > some > > > > light on it. > > > > > > > > Regards > > > > Sumit Chawla > > > > > > > > > > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler < > ches...@apache.org> > > > > wrote: > > > > > > > > > Hello, > > > > > > > > > > this is a rather subtle issue you stumbled upon here. > > > > > > > > > > The ExecutionGraph is not serializable. The only reason why the > > > > > WebInterface can access it is because it runs in the same JVM as > the > > > > > JobManager. > > > > > > > > > > I'm not sure if there is a way for what you are trying to do. > > > > > > > > > > Regards, > > > > > Chesnay > > > > > > > > > > > > > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > > > > > > > > > >> Hi All > > > > >> > > > > >> > > > > >> I am trying to get JOB accumulators. ( I am aware that I can get > > the > > > > >> accumulators through REST APIs as well, but i wanted to avoid JSON > > > > >> parsing). > > > > >> > > > > >> Looking at JobAccumulatorsHandler i am trying to get execution > graph > > > for > > > > >> currently running job. Following is my code: > > > > >> > > > > >>InetSocketAddress initialJobManagerAddress=new > > > > >> InetSocketAddress(hostName,port); > > > > >> InetAddress ownHostname; > > > > >> ownHostname= > > > > >> ConnectionUtils.findConnectingAddress( > initialJobManagerAddress,2000, > > > > 400); > > > > >> > > > > >> ActorSystem actorSystem= > AkkaUtils.createActorSystem(co > > > > >> nfiguration, > > > > >> new Some(new > > > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); > >
Re: Get Flink ExecutionGraph Programmatically
Hi Sean My goal here is to get User Accumulators. I know there exists the REST Calls. But since i am running my code in the same JVM, i wanted to avoid go over HTTP. I saw this code in JobAccumulatorsHandler and tried to use this. Would you suggest some alternative approach to avoid this over the network serialization for Akka? Regards Sumit Chawla On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote: > Between two different actor systems in the same JVM, messages are still > serialized (they go through a local socket, I think). > > Getting the execution graph is not easily possible, and not intended, as it > actually contains RPC resources, etc. > > What do you need from the execution graph? Maybe there is another way to > achieve that... > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com> > wrote: > > > Hi Chesney > > > > I am actually running this code in the same JVM as the WebInterface and > > JobManager. I am programmatically, starting the JobManager. and then > > running this code in same JVM to query metrics. Only difference could be > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if > it > > forces it to execute the code as if request is coming over the wire. I > am > > not very well aware of Akka internals, so may be somebody can shed some > > light on it. > > > > Regards > > Sumit Chawla > > > > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <ches...@apache.org> > > wrote: > > > > > Hello, > > > > > > this is a rather subtle issue you stumbled upon here. > > > > > > The ExecutionGraph is not serializable. The only reason why the > > > WebInterface can access it is because it runs in the same JVM as the > > > JobManager. > > > > > > I'm not sure if there is a way for what you are trying to do. > > > > > > Regards, > > > Chesnay > > > > > > > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > > > > > >> Hi All > > >> > > >> > > >> I am trying to get JOB accumulators. ( I am aware that I can get the > > >> accumulators through REST APIs as well, but i wanted to avoid JSON > > >> parsing). > > >> > > >> Looking at JobAccumulatorsHandler i am trying to get execution graph > for > > >> currently running job. Following is my code: > > >> > > >>InetSocketAddress initialJobManagerAddress=new > > >> InetSocketAddress(hostName,port); > > >> InetAddress ownHostname; > > >> ownHostname= > > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000, > > 400); > > >> > > >> ActorSystem actorSystem= AkkaUtils.createActorSystem(co > > >> nfiguration, > > >> new Some(new > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); > > >> > > >> FiniteDuration timeout= FiniteDuration.apply(10, > > >> TimeUnit.SECONDS); > > >> > > >> ActorGateway akkaActorGateway= > > >> LeaderRetrievalUtils.retrieveLeaderGateway( > > >> > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration), > > >> actorSystem,timeout > > >> ); > > >> > > >> > > >> Future future=akkaActorGateway.ask(new > > >> RequestJobDetails(true,false),timeout); > > >> > > >> MultipleJobsDetails result=(MultipleJobsDetails) > > >> Await.result(future,timeout); > > >> ExecutionGraphHolder executionGraphHolder=new > > >> ExecutionGraphHolder(timeout); > > >> LOG.info(result.toString()); > > >> for(JobDetails detail:result.getRunningJobs()){ > > >> LOG.info(detail.getJobName() + " ID " + > > >> detail.getJobId()); > > >> > > >> *ExecutionGraph > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail. > > getJobId(), > > >> akkaActorGateway);* > > >> > > >> LOG.info("Accumulators " + > > >> executionGraph.aggregateUserAccumulators()); > > >> } > > >> > > >> > &g
Re: Get Flink ExecutionGraph Programmatically
Hi Chesney I am actually running this code in the same JVM as the WebInterface and JobManager. I am programmatically, starting the JobManager. and then running this code in same JVM to query metrics. Only difference could be that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it forces it to execute the code as if request is coming over the wire. I am not very well aware of Akka internals, so may be somebody can shed some light on it. Regards Sumit Chawla On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <ches...@apache.org> wrote: > Hello, > > this is a rather subtle issue you stumbled upon here. > > The ExecutionGraph is not serializable. The only reason why the > WebInterface can access it is because it runs in the same JVM as the > JobManager. > > I'm not sure if there is a way for what you are trying to do. > > Regards, > Chesnay > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > >> Hi All >> >> >> I am trying to get JOB accumulators. ( I am aware that I can get the >> accumulators through REST APIs as well, but i wanted to avoid JSON >> parsing). >> >> Looking at JobAccumulatorsHandler i am trying to get execution graph for >> currently running job. Following is my code: >> >>InetSocketAddress initialJobManagerAddress=new >> InetSocketAddress(hostName,port); >> InetAddress ownHostname; >> ownHostname= >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400); >> >> ActorSystem actorSystem= AkkaUtils.createActorSystem(co >> nfiguration, >> new Some(new >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); >> >> FiniteDuration timeout= FiniteDuration.apply(10, >> TimeUnit.SECONDS); >> >> ActorGateway akkaActorGateway= >> LeaderRetrievalUtils.retrieveLeaderGateway( >> >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration), >> actorSystem,timeout >> ); >> >> >> Future future=akkaActorGateway.ask(new >> RequestJobDetails(true,false),timeout); >> >> MultipleJobsDetails result=(MultipleJobsDetails) >> Await.result(future,timeout); >> ExecutionGraphHolder executionGraphHolder=new >> ExecutionGraphHolder(timeout); >> LOG.info(result.toString()); >> for(JobDetails detail:result.getRunningJobs()){ >> LOG.info(detail.getJobName() + " ID " + >> detail.getJobId()); >> >> *ExecutionGraph >> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(), >> akkaActorGateway);* >> >> LOG.info("Accumulators " + >> executionGraph.aggregateUserAccumulators()); >> } >> >> >> However, i am receiving following error in Flink: >> >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody >> ERROR akka.remote.EndpointWriter - Transient association error >> (association >> remains live) >> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint. >> CheckpointCoordinator >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1184) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >> ream.java:1548) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >> m.java:1509) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >> tream.java:1432) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >> ream.java:1548) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >> m.java:1509) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >> tream.java:1432) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream. >> java:348) >> ~[?:1.8.0_92] >> at akka.serialization.JavaSerializer$$anonfun$ >> toBinary$1.apply$mcV$sp(Serializer.scala:129) >> ~[akka-actor_2.10-2.3.7.jar:?] >> at akka.serialization.JavaSeriali
Get Flink ExecutionGraph Programmatically
Hi All I am trying to get JOB accumulators. ( I am aware that I can get the accumulators through REST APIs as well, but i wanted to avoid JSON parsing). Looking at JobAccumulatorsHandler i am trying to get execution graph for currently running job. Following is my code: InetSocketAddress initialJobManagerAddress=new InetSocketAddress(hostName,port); InetAddress ownHostname; ownHostname= ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400); ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration, new Some(new Tuple2(ownHostname.getCanonicalHostName(),0))); FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS); ActorGateway akkaActorGateway= LeaderRetrievalUtils.retrieveLeaderGateway( LeaderRetrievalUtils.createLeaderRetrievalService(configuration), actorSystem,timeout ); Future future=akkaActorGateway.ask(new RequestJobDetails(true,false),timeout); MultipleJobsDetails result=(MultipleJobsDetails) Await.result(future,timeout); ExecutionGraphHolder executionGraphHolder=new ExecutionGraphHolder(timeout); LOG.info(result.toString()); for(JobDetails detail:result.getRunningJobs()){ LOG.info(detail.getJobName() + " ID " + detail.getJobId()); *ExecutionGraph executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);* LOG.info("Accumulators " + executionGraph.aggregateUserAccumulators()); } However, i am receiving following error in Flink: 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody ERROR akka.remote.EndpointWriter - Transient association error (association remains live) java.io.NotSerializableException: org.apache.flink.runtime.checkpoint. CheckpointCoordinator at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_92] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_92] at akka.serialization.JavaSerializer$$anonfun$ toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.serialization.JavaSerializer$$anonfun$ toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.serialization.JavaSerializer$$anonfun$ toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?] at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) ~[akka-remote_2.10-2.3.7.jar:?] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) ~[akka-remote_2.10-2.3.7.jar:?] Any reason why its failing? This code works when invoked through WebRuntimeMonitor. Regards Sumit Chawla
Re: Performance and Latency Chart for Flink
Has anyone else run these kind of benchmarks? Would love to hear more people'e experience and details about those benchmarks. Regards Sumit Chawla On Sun, Sep 18, 2016 at 2:01 PM, Chawla,Sumit <sumitkcha...@gmail.com> wrote: > Hi Amir > > Would it be possible for you to share the numbers? Also share if possible > your configuration details. > > Regards > Sumit Chawla > > > On Fri, Sep 16, 2016 at 12:18 PM, amir bahmanyari < > amirto...@yahoo.com.invalid> wrote: > >> Hi Fabian,FYI. This is report on other engines we did the same type of >> bench-marking.Also explains what Linear Road bench-marking is.Thanks for >> your help. >> http://www.slideshare.net/RedisLabs/walmart-ibm-revisit-the- >> linear-road-benchmark >> https://github.com/IBMStreams/benchmarks >> https://www.datatorrent.com/blog/blog-implementing-linear-ro >> ad-benchmark-in-apex/ >> >> >> From: Fabian Hueske <fhue...@gmail.com> >> To: "dev@flink.apache.org" <dev@flink.apache.org> >> Sent: Friday, September 16, 2016 12:31 AM >> Subject: Re: Performance and Latency Chart for Flink >> >> Hi, >> >> I am not aware of periodic performance runs for the Flink releases. >> I know a few benchmarks which have been published at different points in >> time like [1], [2], and [3] (you'll probably find more). >> >> In general, fair benchmarks that compare different systems (if there is >> such thing) are very difficult and the results often depend on the use >> case. >> IMO the best option is to run your own benchmarks, if you have a concrete >> use case. >> >> Best, Fabian >> >> [1] 08/2015: >> http://data-artisans.com/high-throughput-low-latency-and-exa >> ctly-once-stream-processing-with-apache-flink/ >> [2] 12/2015: >> https://yahooeng.tumblr.com/post/135321837876/benchmarking- >> streaming-computation-engines-at >> [3] 02/2016: >> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/ >> >> >> 2016-09-16 5:54 GMT+02:00 Chawla,Sumit <sumitkcha...@gmail.com>: >> >> > Hi >> > >> > Is there any performance run that is done for each Flink release? Or you >> > are aware of any third party evaluation of performance metrics for >> Flink? >> > I am interested in seeing how performance has improved over release to >> > release, and performance vs other competitors. >> > >> > Regards >> > Sumit Chawla >> > >> >> >> >> > >
Re: Performance and Latency Chart for Flink
Hi Amir Would it be possible for you to share the numbers? Also share if possible your configuration details. Regards Sumit Chawla On Fri, Sep 16, 2016 at 12:18 PM, amir bahmanyari < amirto...@yahoo.com.invalid> wrote: > Hi Fabian,FYI. This is report on other engines we did the same type of > bench-marking.Also explains what Linear Road bench-marking is.Thanks for > your help. > http://www.slideshare.net/RedisLabs/walmart-ibm-revisit- > the-linear-road-benchmark > https://github.com/IBMStreams/benchmarks > https://www.datatorrent.com/blog/blog-implementing-linear- > road-benchmark-in-apex/ > > > From: Fabian Hueske <fhue...@gmail.com> > To: "dev@flink.apache.org" <dev@flink.apache.org> > Sent: Friday, September 16, 2016 12:31 AM > Subject: Re: Performance and Latency Chart for Flink > > Hi, > > I am not aware of periodic performance runs for the Flink releases. > I know a few benchmarks which have been published at different points in > time like [1], [2], and [3] (you'll probably find more). > > In general, fair benchmarks that compare different systems (if there is > such thing) are very difficult and the results often depend on the use > case. > IMO the best option is to run your own benchmarks, if you have a concrete > use case. > > Best, Fabian > > [1] 08/2015: > http://data-artisans.com/high-throughput-low-latency-and- > exactly-once-stream-processing-with-apache-flink/ > [2] 12/2015: > https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming- > computation-engines-at > [3] 02/2016: > http://data-artisans.com/extending-the-yahoo-streaming-benchmark/ > > > 2016-09-16 5:54 GMT+02:00 Chawla,Sumit <sumitkcha...@gmail.com>: > > > Hi > > > > Is there any performance run that is done for each Flink release? Or you > > are aware of any third party evaluation of performance metrics for Flink? > > I am interested in seeing how performance has improved over release to > > release, and performance vs other competitors. > > > > Regards > > Sumit Chawla > > > > > >
Performance and Latency Chart for Flink
Hi Is there any performance run that is done for each Flink release? Or you are aware of any third party evaluation of performance metrics for Flink? I am interested in seeing how performance has improved over release to release, and performance vs other competitors. Regards Sumit Chawla