Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1658#discussion_r22288031
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -510,6 +510,52 @@ class SparkContext(config: SparkConf) extends Logging {
           minPartitions).setName(path)
       }
     
    +
    +  /**
    +   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
    +   * (useful for binary data)
    +   *
    +   *
    +   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
    +   *
    +   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
    +   */
    +  @DeveloperApi
    +  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
    +  RDD[(String, PortableDataStream)] = {
    +    val job = new NewHadoopJob(hadoopConfiguration)
    +    NewFileInputFormat.addInputPath(job, new Path(path))
    +    val updateConf = job.getConfiguration
    +    new BinaryFileRDD(
    +      this,
    +      classOf[StreamInputFormat],
    +      classOf[String],
    +      classOf[PortableDataStream],
    +      updateConf,
    +      minPartitions).setName(path)
    +  }
    +
    +  /**
    +   * Load data from a flat binary file, assuming each record is a set of 
numbers
    +   * with the specified numerical format (see ByteBuffer), and the number 
of
    +   * bytes per record is constant (see FixedLengthBinaryInputFormat)
    +   *
    +   * @param path Directory to the input data files
    +   * @param recordLength The length at which to split the records
    +   * @return An RDD of data with values, RDD[(Array[Byte])]
    +   */
    +  def binaryRecords(path: String, recordLength: Int,
    +                    conf: Configuration = hadoopConfiguration): 
RDD[Array[Byte]] = {
    +    conf.setInt("recordLength",recordLength)
    +    val br = newAPIHadoopFile[LongWritable, BytesWritable, 
FixedLengthBinaryInputFormat](path,
    +      classOf[FixedLengthBinaryInputFormat],
    +      classOf[LongWritable],
    +      classOf[BytesWritable],
    +      conf=conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    It turns out that `getBytes` returns a padded byte array, so I think  you 
may need to copy / slice out the subarray with the data using `v.getLength`; 
see [HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to 
programming mistakes"](https://issues.apache.org/jira/browse/HADOOP-6298) for 
more details.
    
    Using `getBytes` without `getLength` has caused bugs in Spark in the past: 
#2712.
    
    Is the use of `getBytes` in this patch a bug?  Or is it somehow safe due to 
our use of FixedLengthBinaryInputFormat?  If it is somehow safe, we should have 
a comment which explains this so that readers who know about the `getBytes` 
issue aren't confused (or better yet, an `assert` that `getBytes` returns an 
array of the expected length).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to