Nice suggestion. So you want to serialize and deserialize the InputFormats
on the Client to check whether they can be transferred correctly? Merely
serializing is not enough because the above Exception occurs during
deserialization.

On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <se...@apache.org> wrote:

> We should try to improve the exception here. More people will run into
> this issue and the exception should help them understand it well.
>
> How about we do eager serialization into a set of byte arrays? Then the
> serializability issue comes immediately when the program is constructed,
> rather than later, when it is shipped.
>
> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>
>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Andreas,
>>>
>>> Thank you for reporting the problem and including the code to reproduce
>>> the problem. I think there is a problem with the class serialization or
>>> deserialization. Arrays.asList uses a private ArrayList class
>>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>>> (java.util.ArrayList).
>>>
>>> I'll create a JIRA issue to keep track of the problem and to investigate
>>> further.
>>>
>>> Best regards,
>>> Max
>>>
>>> Here's the stack trace:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>> task 'DataSource (at main(Test.java:32)
>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>>> data
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>     at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>>     at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>     at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>     at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>     at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>     at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>     at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>     at
>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>>     ... 25 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>>     at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>>     at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>     at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>     at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>>     at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>>     at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>>     at
>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>>     ... 26 more
>>>
>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <and...@cs.aau.dk>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I get a bug when trying to broadcast a list of integers created with the
>>>> primitive "Arrays.asList(...)".
>>>>
>>>> For example, if you try to run this "wordcount" example, you can
>>>> reproduce the bug.
>>>>
>>>>
>>>> public class WordCountExample {
>>>>     public static void main(String[] args) throws Exception {
>>>>         final ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>     DataSet<String> text = env.fromElements(
>>>>                 "Who's there?",
>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>
>>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>>>>
>>>>         DataSet<TestClass> set = env.fromElements(new
>>>> TestClass(elements));
>>>>
>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>                 .flatMap(new LineSplitter())
>>>>                 .withBroadcastSet(set, "set")
>>>>                 .groupBy(0)
>>>>                 .sum(1);
>>>>
>>>>         wordCounts.print();
>>>>     }
>>>>
>>>>     public static class LineSplitter implements FlatMapFunction<String,
>>>> Tuple2<String, Integer>> {
>>>>         @Override
>>>>         public void flatMap(String line, Collector<Tuple2<String,
>>>> Integer>> out) {
>>>>             for (String word : line.split(" ")) {
>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>>     public static class TestClass implements Serializable {
>>>>         private static final long serialVersionUID =
>>>> -2932037991574118651L;
>>>>
>>>>         List<Integer> integerList;
>>>>         public TestClass(List<Integer> integerList){
>>>>             this.integerList=integerList;
>>>>         }
>>>>
>>>>
>>>>     }
>>>> }
>>>>
>>>>
>>>> However, if instead of using the primitive "Arrays.asList(...)", we use
>>>> instead the ArrayList<> constructor, there is any problem!!!!
>>>>
>>>>
>>>> Regards,
>>>> Andres
>>>>
>>>
>>>
>>
>

Reply via email to