Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/448#discussion_r12021785
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala ---
@@ -45,4 +50,146 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
val rdd = baseSchemaRDD.map(new Row(_))
+
+ //
=======================================================================
+ // Base RDD functions that do NOT change schema
+ //
=======================================================================
+
+ // Common RDD functions
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ def cache(): JavaSchemaRDD = {
+ baseSchemaRDD.cache()
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ def persist(): JavaSchemaRDD = {
+ baseSchemaRDD.persist()
+ this
+ }
+
+ /**
+ * Set this RDD's storage level to persist its values across operations
after the first time
+ * it is computed. This can only be used to assign a new storage level
if the RDD does not
+ * have a storage level set yet..
+ */
+ def persist(newLevel: StorageLevel): JavaSchemaRDD = {
+ baseSchemaRDD.persist(newLevel)
+ this
+ }
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from
memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ * @return This RDD.
+ */
+ def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
+ baseSchemaRDD.unpersist(blocking)
+ this
+ }
+
+ /** Assign a name to this RDD */
+ def setName(name: String): JavaSchemaRDD = {
+ baseSchemaRDD.setName(name)
+ this
+ }
+
+ // Transformations (return a new RDD)
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean = false):
JavaSchemaRDD =
+ baseSchemaRDD.coalesce(numPartitions, shuffle)
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(): JavaSchemaRDD = baseSchemaRDD.distinct()
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(numPartitions: Int): JavaSchemaRDD =
+ baseSchemaRDD.distinct(numPartitions)
+
+ /**
+ * Return a new RDD containing only the elements that satisfy a
predicate.
+ */
+ def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
+ baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue())
+
+ /**
+ * Return the intersection of this RDD and another one. The output will
not contain any
+ * duplicate elements, even if the input RDDs did.
+ *
+ * Note that this method performs a shuffle internally.
+ */
+ def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
+ baseSchemaRDD.intersection(other)
+
+ /**
+ * Return the intersection of this RDD and another one. The output will
not contain any
+ * duplicate elements, even if the input RDDs did.
+ *
+ * Note that this method performs a shuffle internally.
+ *
+ * @param partitioner Partitioner to use for the resulting RDD
+ */
+ def intersection(other: JavaSchemaRDD, partitioner: Partitioner):
JavaSchemaRDD =
+ baseSchemaRDD.intersection(other, partitioner)
+
+ /**
+ * Return the intersection of this RDD and another one. The output will
not contain any
+ * duplicate elements, even if the input RDDs did. Performs a hash
partition across the cluster
+ *
+ * Note that this method performs a shuffle internally.
+ *
+ * @param numPartitions How many partitions to use in the resulting RDD
+ */
+ def intersection(other: JavaSchemaRDD, numPartitions: Int):
JavaSchemaRDD =
+ baseSchemaRDD.intersection(other, numPartitions)
+
+ /**
+ * Return a new RDD that has exactly `numPartitions` partitions.
+ *
+ * Can increase or decrease the level of parallelism in this RDD.
Internally, this uses
+ * a shuffle to redistribute data.
+ *
+ * If you are decreasing the number of partitions in this RDD, consider
using `coalesce`,
+ * which can avoid performing a shuffle.
+ */
+ def repartition(numPartitions: Int): JavaSchemaRDD =
+ baseSchemaRDD.repartition(numPartitions)
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is
huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
+ baseSchemaRDD.subtract(other)
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
+ baseSchemaRDD.subtract(other, numPartitions)
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
+ baseSchemaRDD.subtract(other, p)
+}
+
+object JavaSchemaRDD {
+
+ implicit def fromSchemaRDD(rdd: SchemaRDD): JavaSchemaRDD =
--- End diff --
While I'm generally a fan of scala trickery. I have found massaging type
with implicits often comes back to bite you. You may be safe here, but I'd
still prefer we just call a wrap method explicitly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---