Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/3225#discussion_r20703826
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -630,7 +634,10 @@ class SparkContext(config: SparkConf) extends
SparkStatusAPI with Logging {
* necessary info (e.g. file name for a filesystem-based dataset, table
name for HyperTable),
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
- * @param conf JobConf for setting up the dataset
+ * @param conf JobConf for setting up the dataset. Note: This will be
put into a Broadcast.
--- End diff --
People may call this method directly and pass their Configuration.
```Scala
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration)
```
E.g., creating a configuration for accessing hbase:
```Scala
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {
val out: ByteArrayOutputStream = new ByteArrayOutputStream
val dos: DataOutputStream = new DataOutputStream(out)
scan.write(dos)
Base64.encodeBytes(out.toByteArray)
}
val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
rdd.count()
```
This is fine. However, some people may need to access two tables and union
them. They may reuse the Configuration like this:
```Scala
val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
conf.set(TableInputFormat.INPUT_TABLE, "another_table_name")
val rdd2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
rdd.union(rdd2).count()
```
The result will be weird.
I think the docs should tell people not to reuse it like this.
My motivation is this mail thread:
http://apache-spark-user-list.1001560.n3.nabble.com/How-did-the-RDD-union-work-td18686.html
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]