Hi Tony,

Is

com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord

One of your own packages?

Sounds like it is one throwing the error

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 17 March 2016 at 15:21, Tony Liu <[email protected]> wrote:

> Hi,
>    My HDFS file is store with custom data structures. I want to read it
> with SparkContext object.So I define a formatting object:
>
> *1. code of RawDataInputFormat.scala*
>
> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.mapred._
>
> /**
>   * Created by Tony on 3/16/16.
>   */
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> FileInputFormat {
>
>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
> Reporter): RecordReader[LW, RD] = {
>     new RawReader(split, job, reporter)
>   }
>
> }
>
> *2. code of RawReader.scala*
>
> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
> import org.apache.hadoop.io.{LongWritable, SequenceFile}
> import org.apache.hadoop.mapred._
>
> /**
>   * Created by Tony on 3/17/16.
>   */
> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> RecordReader[LW, RD] {
>
>   var reader: SequenceFile.Reader = null
>   var currentPos: Long = 0L
>   var length: Long = 0L
>
>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>     this()
>     val p = (split.asInstanceOf[FileSplit]).getPath
>     reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>   }
>
>   override def next(key: LW, value: RD): Boolean = {
>     val flag = reader.next(key, value)
>     currentPos = reader.getPosition()
>     flag
>   }
>
>   override def getProgress: Float = Math.min(1.0f, currentPos / 
> length.toFloat)
>
>   override def getPos: Long = currentPos
>
>   override def createKey(): LongWritable = {
>     new LongWritable()
>   }
>
>   override def close(): Unit = {
>     reader.close()
>   }
>
>   override def createValue(): RDRawDataRecord = {
>     new RDRawDataRecord()
>   }
> }
>
> *3. code of RDRawDataRecord.scala*
>
> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
> import java.io.DataInput;
> import java.io.DataOutput;
> import java.io.IOException;
> import org.apache.commons.lang.StringUtils;
> import org.apache.hadoop.io.Writable;
>
> public class RDRawDataRecord implements Writable {
>     private String smac;
>     private String dmac;
>     private int hrssi;
>     private int lrssi;
>     private long fstamp;
>     private long lstamp;
>     private long maxstamp;
>     private long minstamp;
>     private long stamp;
>
>     public void readFields(DataInput in) throws IOException {
>         this.smac = in.readUTF();
>         this.dmac = in.readUTF();
>         this.hrssi = in.readInt();
>         this.lrssi = in.readInt();
>         this.fstamp = in.readLong();
>         this.lstamp = in.readLong();
>         this.maxstamp = in.readLong();
>         this.minstamp = in.readLong();
>         this.stamp = in.readLong();
>     }
>
>     public void write(DataOutput out) throws IOException {
>         out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>         out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>         out.writeInt(this.hrssi);
>         out.writeInt(this.lrssi);
>         out.writeLong(this.fstamp);
>         out.writeLong(this.lstamp);
>         out.writeLong(this.maxstamp);
>         out.writeLong(this.minstamp);
>         out.writeLong(this.stamp);
>     }
>
>     */** *
>
>         *ignore getter setter*
>
> *    **/*
>
> }
>
> *At last, I use this code to run*:
>
> val filePath = 
> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
> val conf = new SparkConf()
> conf.setMaster("local")
> conf.setAppName("demo")
> val sc = new SparkContext(conf)
> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
> file.foreach(v => {
>   println(v._2.getDmac) // Attribute of custom objects
> })
>
> *I get an error, it says:*
>
> Error:(41, 19) type arguments 
> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>  conform to the bounds of none of the overloaded alternatives of
>  value hadoopFile: [K, V, F <: 
> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: 
> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit 
> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, F 
> <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: 
> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: 
> scala.reflect.ClassTag[V], implicit fm: 
> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
>     val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>                   ^
>
>
>
> *I also try read the text file with SparkContext AIP
> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")',
> It works.*
> *This error is what does this mean? How to fix this error?*
>
> Thank you for help me.
>
> --
> Tony
> :)
>

Reply via email to