Hi,

I am testing Spark Streaming (local mode, with Kafka). The code is as
follows:


public class LocalStreamTest2 {
 public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext("local[4]", "Local Stream Test");
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2500));
 JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(ssc, "localhost:2181", "local_stream_test",
ImmutableMap.of("mcdr", 4));
JavaDStream<String> lines = (JavaDStream<String>) messages.map(x -> x._2);
 JavaDStream<Record> records = lines.map(x -> new Record(x));
 records.foreachRDD(rdd -> {
* rdd.foreachPartition(partition -> {*
*partition.forEachRemaining(record -> {*
System.out.println(record);
});
});
return null;
});
ssc.start();
ssc.awaitTermination();
}

}

@SuppressWarnings("serial")
class Record implements Serializable {
String x;
public Record(String x) {
this.x = x;
}
}



When this program tries to consume messages from kafka, it shows the
following error message.

14/10/16 09:53:12 ERROR executor.Executor: Exception in task 1.0 in stage
1.0 (TID 2)
java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
at java.io.ObjectStreamClass.invokeReadResolve(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.lang.invoke.SerializedLambda.readResolve(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
... 28 more
Caused by: java.lang.IllegalArgumentException: *Invalid lambda
deserialization*
at cdr.LocalStreamTest2.$deserializeLambda$(LocalStreamTest2.java:1)
... 37 more



Interestingly, when I replace

*rdd.foreachPartition(partition -> {*
* partition.forEachRemaining(record -> {*

with

*rdd.toLocalIterator().forEachRemaining(record -> {*



, It works fine.


I'm using jdk1.8.20, spark 1.1.0, and kafka 0.8.1.

Any ideas?
Thanks.

Reply via email to