Re: Spark Code to read RCFiles
I used the following code as an example to deserialize BytesRefArrayWritable. http://www.massapi.com/source/hive-0.5.0-dev/src/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java.html Best Regards, Cem. On Wed, Sep 24, 2014 at 1:34 PM, Pramod Biligiri wrote: > I'm afraid SparkSQL isn't an option for my use case, so I need to use the > Spark API itself. > I turned off Kryo, and I'm getting a NullPointerException now: > > scala> val ref = file.take(1)(0)._2 > ref: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable = > org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@9753e > > scala> ref.size > res7: Int = 79 // *This matches the no. of columns that I know exist in > that RC record* > > scala> ref.get(0) > java.lang.NullPointerException > at > org.apache.hadoop.hive.serde2.columnar.BytesRefWritable.toString(BytesRefWritable.java:194) > at > scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324) > at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329) > at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337) > at .(:10) > at .() > at $print() > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) > > Pramod > > On Wed, Sep 24, 2014 at 7:38 AM, cem wrote: > >> >> I was able to read RC files with the following line: >> >> >> val file: RDD[(LongWritable, BytesRefArrayWritable)] = >> sc.hadoopFile("hdfs://day=2014-08-10/hour=00/", >> classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]], >> classOf[LongWritable], classOf[BytesRefArrayWritable],500) >> >> Try with disabling kryo serializer. >> >> Best Regards, >> Cem Cayiroglu >> >> On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia >> wrote: >> >>> Is your file managed by Hive (and thus present in a Hive metastore)? In >>> that case, Spark SQL ( >>> https://spark.apache.org/docs/latest/sql-programming-guide.html) is the >>> easiest way. >>> >>> Matei >>> >>> On September 23, 2014 at 2:26:10 PM, Pramod Biligiri ( >>> pramodbilig...@gmail.com) wrote: >>> >>> Hi, >>> I'm trying to read some data in RCFiles using Spark, but can't seem to >>> find a suitable example anywhere. Currently I've written the following bit >>> of code that lets me count() the no. of records, but when I try to do a >>> collect() or a map(), it fails with a ConcurrentModificationException. I'm >>> running Spark 1.0.1 on a Hadoop YARN cluster: >>> >>> import org.apache.hadoop.hive.ql.io.RCFileInputFormat; >>> val file = sc.hadoopFile("/hdfs/path/to/file", >>> >>> classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]], >>> classOf[org.apache.hadoop.io.LongWritable], >>> classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable] >>> ) >>> file.collect() >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: >>> Task 10.0:6 failed 4 times, most recent failure: Exception failure in TID >>> 395 on host (redacted): com.esotericsoftware.kryo.KryoException: >>> java.util.ConcurrentModificationException >>> Serialization trace: >>> classes (sun.misc.Launcher$AppClassLoader) >>> parent (org.apache.spark.repl.ExecutorClassLoader) >>> classLoader (org.apache.hadoop.mapred.JobConf) >>> conf (org.apache.hadoop.io.compress.GzipCodec) >>> codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer) >>> this$0 >>> (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl) >>> lazyDecompressObj >>> (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable) >>> bytesRefWritables >>> (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable) >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> >>> com.esotericsof
Re: Spark Code to read RCFiles
I was able to read RC files with the following line: val file: RDD[(LongWritable, BytesRefArrayWritable)] = sc.hadoopFile("hdfs://day=2014-08-10/hour=00/", classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]], classOf[LongWritable], classOf[BytesRefArrayWritable],500) Try with disabling kryo serializer. Best Regards, Cem Cayiroglu On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia wrote: > Is your file managed by Hive (and thus present in a Hive metastore)? In > that case, Spark SQL ( > https://spark.apache.org/docs/latest/sql-programming-guide.html) is the > easiest way. > > Matei > > On September 23, 2014 at 2:26:10 PM, Pramod Biligiri ( > pramodbilig...@gmail.com) wrote: > > Hi, > I'm trying to read some data in RCFiles using Spark, but can't seem to > find a suitable example anywhere. Currently I've written the following bit > of code that lets me count() the no. of records, but when I try to do a > collect() or a map(), it fails with a ConcurrentModificationException. I'm > running Spark 1.0.1 on a Hadoop YARN cluster: > > import org.apache.hadoop.hive.ql.io.RCFileInputFormat; > val file = sc.hadoopFile("/hdfs/path/to/file", > > classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]], > classOf[org.apache.hadoop.io.LongWritable], > classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable] > ) > file.collect() > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on > host (redacted): com.esotericsoftware.kryo.KryoException: > java.util.ConcurrentModificationException > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > parent (org.apache.spark.repl.ExecutorClassLoader) > classLoader (org.apache.hadoop.mapred.JobConf) > conf (org.apache.hadoop.io.compress.GzipCodec) > codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer) > this$0 > (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl) > lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable) > bytesRefWritables > (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
Spark processing small files.
Hi all, Spark is taking too much time to start the first stage with many small files in HDFS. I am reading a folder that contains RC files: sc.hadoopFile("hdfs://hostname :8020/test_data2gb/", classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]], classOf[LongWritable], classOf[BytesRefArrayWritable]) And parse: val parsedData = file.map((tuple: (LongWritable, BytesRefArrayWritable)) => RCFileUtil.getData(tuple._2)) 620 3mb files (2Gb total) takes considerable more time to start the first stage than 200 40mb 8gb total. Do you have any idea about the reason? Thanks! Best Regards, Cem Cayiroglu