Re: flink-shell remote
Hi Anton, I was involved in writing the scala-shell, my initial idea was to just write a very small job and try to execute it remotely, and see if something comes back. I did not have time in the end to implement it. Just if you need a pointer how this could be done.. best, Nikolaas 2017-02-08 14:07 GMT+01:00 Till Rohrmann: > Hi Anton, > > if you like, then you take a stab at it. Best if you first create a JIRA > for that. > > Cheers, > Till > > On Wed, Feb 8, 2017 at 12:27 PM, Anton Solovev > wrote: > > > Hi > > > > Can we check connection to remote host before execution a program when > > start flink-shell ? > > > > For example, right after `bin/start-scala-shell.sh remote > > 35007` it checks and will not start if it wrong connects > > > > Best, > > Anton > > >
Re: Bug in Scala Shell
Hi Trevor, this is a bug and was introduced before the streaming shell: https://issues.apache.org/jira/browse/FLINK-3701 I suspect something changed somewhere else in the Flink code that breaks the scala shell. In the scala shell tests executing the same program twice is not done so it went unnoticed. The streaming shell does seem to work. If you need the batch shell, I checked for version 1.0 and it seemed to work. best, Nikolaas 2016-04-18 23:35 GMT+02:00 Trevor Grant: > I was trying out the new scala-shell with streaming support... > > The following code executes correctly the first time I run it: > > val survival = benv.readCsvFile[(String, String, String, > String)]("file:///home/trevor/gits/datasets/haberman/haberman.data") > survival.count() > > However, if I call survival.count() again, run the entire code again I get > the following error: > > java.lang.NullPointerException > at > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1074) > at > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1004) > at > > org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70) > at > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898) > at > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) > at org.apache.flink.api.scala.DataSet.count(DataSet.scala:528) > at .(:62) > at .() > at .(:7) > at .() > at $print() > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > at > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > at > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > at > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > at > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) > > > Is this me or is this a bug? or both? > > Thanks, > > tg > > Trevor Grant > Data Scientist > https://github.com/rawkintrevo > http://stackexchange.com/users/3002022/rawkintrevo > http://trevorgrant.org > > *"Fortunate is he, who is able to know the causes of things." -Virgil* >
Re: Apache Big Data
Hi guys, I've integrated streaming in zeppelin for flink. It works using the scala shell, which I extended to support the streaming application. Unfortunately the scala-shell for streaming is not yet included in the Flink-master, and changed a bit upon public request since I implemented the zeppelin-flink connection. It should, however, work with these two versions of Flink and Zeppelin: https://github.com/nikste/flink/tree/Flink-2522_Scala_shell_streaming_download_linux https://github.com/nikste/incubator-zeppelin/tree/visualizationDemo Note that the resubmission of streaming jobs by zeppelin is not yet working. The running streaming job has to be cancelled by zeppelin, which is possible by finding out the job-id and sending a cancel message to the task manager. I'll continue integrating it with the current versions of Flink and Zeppelin as soon as I have the time. cheers, Nik 2016-03-02 10:27 GMT+01:00 Till Rohrmann: > Great to hear that you two are giving a talk at ApacheCon. > > As far as I know, there's nobody working on a streaming interpreter for > Zeppelin. People thought about doing it, but it never got realized so far. > But I think it should not be too difficult to implement. So if you wanna > take the lead there, go ahead. I'm convinced that the community would be > very happy about it. > > Cheers, > Till > > On Tue, Mar 1, 2016 at 9:15 PM, Suneel Marthi wrote: > > > Same here, will be presenting Distributed ML using Mahout on Flink at > > ApacheCon. > > > > Some big shout outs to Flink coming in Vancouver. > > > > On Tue, Mar 1, 2016 at 2:35 PM, Trevor Grant > > wrote: > > > > > Any one who is attending Apache: Big Data in Vancouver May 9-12 stop by > > and > > > check out my talk: *Everyone Plays: Collaborative Data Science With > > > Zeppelin* > > > > > > I'll obviously be giving some big shout outs to Flink. > > > > > > Which leads me to my next question: is anyone working on a Flink > > Streaming > > > Interpreter for Zeppelin? I've been heads down on writing those LTSM > > Neural > > > Nets for Flink (also hoping to have those ready for show casing in > time), > > > but a notebook running Flink Streaming would be quite dope. > > > > > > Thanks! > > > tg > > > > > > > > > Trevor Grant > > > Data Scientist > > > https://github.com/rawkintrevo > > > http://stackexchange.com/users/3002022/rawkintrevo > > > http://trevorgrant.org > > > > > > *"Fortunate is he, who is able to know the causes of things." -Virgil* > > > > > >
cancel running stream job
Hi guys, I'm trying to connect flink streaming and zeppelin, however if I try to resubmit a streaming program from zeppelin I receive a "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException". Is there a way to cancel a running Streaming Job ? And would cancelling the Streaming Job in this case free up resources for resubmission? best, Nikolaas
Re: Flink interactive Scala shell
Thanks for the feedback guys! Apparently The Scala Shell compiles the Shell input to some kind of virtual directory. It should be possible to create a jar from it's content and then hand it over to Flink for execution in some way. I will further investigate.. cheers, Nikolaas 2015-04-15 11:20 GMT+02:00 Stephan Ewen se...@apache.org: To give a bit of context for the exception: To execute a program, the classes of the user functions need to be available the executing TaskManagers. - If you execute locally from the IDE, all classes are in the classpath anyways. - If you use the remote environment, you need to attach the jar file to environment. - In your case (repl), you need to make sure that the generated classes are given to the TaskManager. In that sense, the approach is probably similar to the case of executing with a remote environment - only that you do not have a jar file up front, but need to generate it on the fly. As Robert mentioned, https://github.com/apache/flink/pull/35 may have a first solution to that. Other approaches are also possible, like simply always bundling all classes in the directory where the repl puts its generated classes. Greetings, Stephan On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek aljos...@apache.org wrote: I will look into it once I have some time (end of this week, or next week probably) On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger rmetz...@apache.org wrote: Hey Nikolaas, Thank you for posting on the mailing list. I've met Nikolaas today in person and we were talking a bit about an interactive shell for Flink, potentially also an integration with Zeppelin. Great stuff I'm really looking forward to :) We were wondering if somebody from the list has some experience with the scala shell. I've pointed Nikolaas also to this PR: https://github.com/apache/flink/pull/35. Best, Robert On Tue, Apr 14, 2015 at 5:26 PM, nse sik nikolaas.steenber...@gmail.com wrote: Hi! I am trying to implement a scala shell for flink. I've started with a simple scala object who's main function will drop the user to the interactive scala shell (repl) at one point: import scala.tools.nsc.interpreter.ILoop import scala.tools.nsc.Settings object Job { def main(args: Array[String]) { val repl = new ILoop() repl.settings = new Settings() // enable this line to use scala in intellij repl.settings.usejavacp.value = true repl.createInterpreter() // start scala interpreter shell repl.process(repl.settings) repl.closeInterpreter() } } Now I am trying to execute the word count example as in: scala import org.apache.flink.api.scala._ scala val env = ExecutionEnvironment.getExecutionEnvironment scala val text = env.fromElements(To be, or not to be,--that is the question:--,Whether 'tis nobler in the mind to suffer, The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,) scala val counts = text.flatMap { _.toLowerCase.split(\\W+) }.map { (_, 1) }.groupBy(0).sum(1) scala counts.print() scala env.execute(Flink Scala Api Skeleton) However I am running into following error: env.execute(Flink Scala Api Skeleton) org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:187) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) at org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) at org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) at