Hi Tony, Is
com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord One of your own packages? Sounds like it is one throwing the error HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 17 March 2016 at 15:21, Tony Liu <[email protected]> wrote: > Hi, > My HDFS file is store with custom data structures. I want to read it > with SparkContext object.So I define a formatting object: > > *1. code of RawDataInputFormat.scala* > > import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord > import org.apache.hadoop.io.LongWritable > import org.apache.hadoop.mapred._ > > /** > * Created by Tony on 3/16/16. > */ > class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends > FileInputFormat { > > override def getRecordReader(split: InputSplit, job: JobConf, reporter: > Reporter): RecordReader[LW, RD] = { > new RawReader(split, job, reporter) > } > > } > > *2. code of RawReader.scala* > > import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord > import org.apache.hadoop.io.{LongWritable, SequenceFile} > import org.apache.hadoop.mapred._ > > /** > * Created by Tony on 3/17/16. > */ > class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends > RecordReader[LW, RD] { > > var reader: SequenceFile.Reader = null > var currentPos: Long = 0L > var length: Long = 0L > > def this(split: InputSplit, job: JobConf, reporter: Reporter) { > this() > val p = (split.asInstanceOf[FileSplit]).getPath > reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p)) > } > > override def next(key: LW, value: RD): Boolean = { > val flag = reader.next(key, value) > currentPos = reader.getPosition() > flag > } > > override def getProgress: Float = Math.min(1.0f, currentPos / > length.toFloat) > > override def getPos: Long = currentPos > > override def createKey(): LongWritable = { > new LongWritable() > } > > override def close(): Unit = { > reader.close() > } > > override def createValue(): RDRawDataRecord = { > new RDRawDataRecord() > } > } > > *3. code of RDRawDataRecord.scala* > > import com.kiisoo.aegis.common.rawdata.RawDataRecord; > import java.io.DataInput; > import java.io.DataOutput; > import java.io.IOException; > import org.apache.commons.lang.StringUtils; > import org.apache.hadoop.io.Writable; > > public class RDRawDataRecord implements Writable { > private String smac; > private String dmac; > private int hrssi; > private int lrssi; > private long fstamp; > private long lstamp; > private long maxstamp; > private long minstamp; > private long stamp; > > public void readFields(DataInput in) throws IOException { > this.smac = in.readUTF(); > this.dmac = in.readUTF(); > this.hrssi = in.readInt(); > this.lrssi = in.readInt(); > this.fstamp = in.readLong(); > this.lstamp = in.readLong(); > this.maxstamp = in.readLong(); > this.minstamp = in.readLong(); > this.stamp = in.readLong(); > } > > public void write(DataOutput out) throws IOException { > out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:""); > out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:""); > out.writeInt(this.hrssi); > out.writeInt(this.lrssi); > out.writeLong(this.fstamp); > out.writeLong(this.lstamp); > out.writeLong(this.maxstamp); > out.writeLong(this.minstamp); > out.writeLong(this.stamp); > } > > */** * > > *ignore getter setter* > > * **/* > > } > > *At last, I use this code to run*: > > val filePath = > "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044" > val conf = new SparkConf() > conf.setMaster("local") > conf.setAppName("demo") > val sc = new SparkContext(conf) > val file = sc.hadoopFile[LongWritable, RDRawDataRecord, > RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) > file.foreach(v => { > println(v._2.getDmac) // Attribute of custom objects > }) > > *I get an error, it says:* > > Error:(41, 19) type arguments > [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]] > conform to the bounds of none of the overloaded alternatives of > value hadoopFile: [K, V, F <: > org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: > scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit > fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, F > <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: > Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: > scala.reflect.ClassTag[V], implicit fm: > scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] > val file = sc.hadoopFile[LongWritable, RDRawDataRecord, > RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) > ^ > > > > *I also try read the text file with SparkContext AIP > 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', > It works.* > *This error is what does this mean? How to fix this error?* > > Thank you for help me. > > -- > Tony > :) >
