Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3225#discussion_r20703826
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -630,7 +634,10 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
        * necessary info (e.g. file name for a filesystem-based dataset, table 
name for HyperTable),
        * using the older MapReduce API (`org.apache.hadoop.mapred`).
        *
    -   * @param conf JobConf for setting up the dataset
    +   * @param conf JobConf for setting up the dataset. Note: This will be 
put into a Broadcast.
    --- End diff --
    
    People may call this method directly and pass their Configuration.
    ```Scala
       def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
           path: String,
           fClass: Class[F],
           kClass: Class[K],
           vClass: Class[V],
           conf: Configuration = hadoopConfiguration)
    ```
    E.g., creating a configuration for accessing hbase:
    ```Scala
    import java.io.{DataOutputStream, ByteArrayOutputStream}
    import java.lang.String
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Base64
    
    def convertScanToString(scan: Scan): String = {
      val out: ByteArrayOutputStream = new ByteArrayOutputStream
      val dos: DataOutputStream = new DataOutputStream(out)
      scan.write(dos)
      Base64.encodeBytes(out.toByteArray)
    }
    
    val conf = HBaseConfiguration.create()
    val scan = new Scan()
    scan.setCaching(500)
    scan.setCacheBlocks(false)
    conf.set(TableInputFormat.INPUT_TABLE, "table_name")
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])
    rdd.count()
    ```
    This is fine. However, some people may need to access two tables and union 
them. They may reuse the Configuration like this:
    ```Scala
    val conf = HBaseConfiguration.create()
    val scan = new Scan()
    scan.setCaching(500)
    scan.setCacheBlocks(false)
    conf.set(TableInputFormat.INPUT_TABLE, "table_name")
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])
    
    conf.set(TableInputFormat.INPUT_TABLE, "another_table_name")
    val rdd2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])
    
    rdd.union(rdd2).count()
    ```
    The result will be weird.
    I think the docs should tell people not to reuse it like this.
    
    My motivation is this mail thread: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-did-the-RDD-union-work-td18686.html
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to