Dear Gyula, Thank You very much for your idea that indeed makes the program surpass that error.
Now I run into a deserialization error and I have some doubts of its cause. Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields) ? I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution. ------------------------------------- Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) ... 12 more at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) ... 12 more at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) at org.apache.flink.client.program.Client.run(Client.java:325) at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62) at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42) at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) at org.apache.flink.client.program.Client.run(Client.java:244) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) ------------------------- Best regards, Camelia ----- Mail original ----- > De: "Gyula Fóra" <[email protected]> > À: [email protected] > Envoyé: Vendredi 7 Novembre 2014 14:06:47 > Objet: Re: JSON file not found - StreamExecutionEnvironment > Hello, > Please try running the same job, but for the file path drop the file:// so > just " /Users/X/Y/Z/theFile. txt" > I think this will fix your problem, however we need to fix this in the api. > Regards, > Gyula > On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac < > [email protected] > wrote: > > Hello, > > > I wrote a small program to test the JSON parsing capability with the new > > streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" > > exception. > > > As a context for my question: > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.createLocalEnvironment(); // neither with > > StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work > > > DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = > > env.readTextFile(args[0]). flatMap (....); > > > At runtime I pass the arguments as follows: > > > flink run --jarfile ./quickstart/target/quickstart-0.1.jar --class > > org.apache.flink.ReadJSONDirectly --arguments > > file:///Users/X/Y/Z/theFile.txt file:///Users/X/Y/Z/outputFile.txt -v > > > and even if the file exists in the path, I still get the error stack: > > > Error: The main method caused an error. > > > org.apache.flink.client.program.ProgramInvocationException: The main method > > caused an error. > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > > > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > > > at org.apache.flink.client.program.Client.run(Client.java:244) > > > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > > > at > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > > Caused by: java.lang.IllegalArgumentException: File not found: > > file:///Users/X/Y/Z/theFile.txt > > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196) > > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164) > > > at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > > at java.lang.reflect.Method.invoke(Method.java:597) > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > > > ... 6 more > > > The same thing happens if I put the file in HDFS and pass as argument the > > hdfs:///pathToFile/theFile.txt > > > What could be the cause, in your opinion? > > > Thank you in advance! > > > Camelia >
