Leonidas Fegaras created SPARK-5297: ---------------------------------------
Summary: File Streams do not work with custom key/values Key: SPARK-5297 URL: https://issues.apache.org/jira/browse/SPARK-5297 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.2.0 Reporter: Leonidas Fegaras Priority: Minor Fix For: 1.2.0 The following code: {code} stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory) .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() { public Void call ( JavaPairRDD<K,V> rdd ) throws Exception { for ( Tuple2<K,V> x: rdd.collect() ) System.out.println("# "+x._1+" "+x._2); return null; } }); stream_context.start(); stream_context.awaitTermination(); {code} for custom (serializable) classes K and V compiles fine but gives an error when I drop a new hadoop sequence file in the directory: {quote} 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000 ms java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236) at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288) at scala.Option.orElse(Option.scala:257) {quote} The same classes K and V work fine for non-streaming Spark: {code} spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf) {code} also streaming works fine for TextFileInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org