Type problem in Java when using flatMapValues
Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) * ean and key are string * from and to are DateTime * value is a Double JavaPairRDDStringString, ListSerializable eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the /from, to /and /values/ to generate a ListLongDouble. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDDStringString,U conforms to JavaPairRDDString,LongDouble Here is what IntelliJ tells me: flatMapValues( FunctionListSerializable, IterableU ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDDString, LongDouble keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements FunctionListSerializable, IterableLongDouble { @Override public IterableLongDouble call(ListSerializable args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be
Re: Type problem in Java when using flatMapValues
Damn, you're right, I wasn't looking at it properly. I was confused by intelliJ I guess. Many thanks! On 2014-10-02 19:02, Sean Owen wrote: Eh, is it not that you are mapping the values of an RDD whose keys are StringStrings, but expecting the keys are Strings? That's also about what the compiler is saying too. On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen robin.keu...@lampiris.be wrote: Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) ean and key are string from and to are DateTime value is a Double JavaPairRDDStringString, ListSerializable eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the from, to and values to generate a ListLongDouble. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDDStringString,U conforms to JavaPairRDDString,LongDouble Here is what IntelliJ tells me: flatMapValues( FunctionListSerializable, IterableU ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDDString, LongDouble keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements FunctionListSerializable, IterableLongDouble { @Override public IterableLongDouble call(ListSerializable args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Serializing with Kryo NullPointerException - Java
Hi all, I am having troubles using Kryo and being new to this kind of serialization, I am not sure where to look. Can someone please help me? :-) Here is my custom class: public class *DummyClass* implements KryoSerializable { private static final Logger LOGGER = LoggerFactory.getLogger(DummyClass.class); int value; public DummyClass() { } public DummyClass(int value) { LOGGER.info(hey I'm dum {}!, value); this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public void write(Kryo kryo, Output output) { output.writeInt(value); } @Override public void read(Kryo kryo, Input input) { this.value = input.readInt(); } } Here is my registrator: public class MyKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(DummyClass.class); } } And the *Spark* code: SparkConf sparkConf = new SparkConf() .setAppName(appName) .setMaster(master) .setJars(jars) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, org.roke.main.MyKryoRegistrator); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); ListDummyClass dummyClasses = Arrays.asList( new DummyClass(1), new DummyClass(2), new DummyClass(3), new DummyClass(4) ); JavaRDDDummyClass rdd = sparkContext.parallelize(dummyClasses); for (DummyClass dummyClass: rdd.collect()) LOGGER.info(driver collected {}, dummyClass); The program fails with the following NullPointerException: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68): java.lang.NullPointerException: com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36) com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be
Re: Serializing with Kryo NullPointerException - Java
Using dependency groupIdcom.esotericsoftware/groupId artifactIdkryo-shaded/artifactId version3.0.0/version /dependency Instead of dependency groupIdcom.esotericsoftware.kryo/groupId artifactIdkryo/artifactId version2.24.0/version /dependency fixed this On 2014-12-03 18:15, Robin Keunen wrote: Hi all, I am having troubles using Kryo and being new to this kind of serialization, I am not sure where to look. Can someone please help me? :-) Here is my custom class: public class *DummyClass* implements KryoSerializable { private static final Logger LOGGER = LoggerFactory.getLogger(DummyClass.class); int value; public DummyClass() { } public DummyClass(int value) { LOGGER.info(hey I'm dum {}!, value); this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public void write(Kryo kryo, Output output) { output.writeInt(value); } @Override public void read(Kryo kryo, Input input) { this.value = input.readInt(); } } Here is my registrator: public class MyKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(DummyClass.class); } } And the *Spark* code: SparkConf sparkConf = new SparkConf() .setAppName(appName) .setMaster(master) .setJars(jars) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, org.roke.main.MyKryoRegistrator); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); ListDummyClass dummyClasses = Arrays.asList( new DummyClass(1), new DummyClass(2), new DummyClass(3), new DummyClass(4) ); JavaRDDDummyClass rdd = sparkContext.parallelize(dummyClasses); for (DummyClass dummyClass: rdd.collect()) LOGGER.info(driver collected {}, dummyClass); The program fails with the following NullPointerException: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68): java.lang.NullPointerException: com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36) com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be