[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r43416866 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala --- @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +/** + * A container for a [[DataFrame]], used for implicit conversions. + * + * @since 1.3.0 + */ +private[sql] case class DatasetHolder[T](df: Dataset[T]) { + + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. --- End diff -- Thanks, already fixed here: #9358 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user binarybana commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r43416475 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala --- @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +/** + * A container for a [[DataFrame]], used for implicit conversions. + * + * @since 1.3.0 + */ +private[sql] case class DatasetHolder[T](df: Dataset[T]) { + + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. --- End diff -- There are a few references to DataFrame and 1.3.0 here that should probably be updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r43120465 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression} + +object GroupedIterator { + def apply( + input: Iterator[InternalRow], + keyExpressions: Seq[Expression], + inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = { +if (input.hasNext) { + new GroupedIterator(input, keyExpressions, inputSchema) +} else { + Iterator.empty +} + } +} + +/** + * Iterates over a presorted set of rows, chunking it up by the grouping expression. Each call to + * next will return a pair containing the current group and an iterator that will return all the + * elements of that group. Iterators for each group are lazily constructed by extracting rows + * from the input iterator. As such, full groups are never materialized by this class. + * + * Example input: + * {{{ + * Input: [a, 1], [b, 2], [b, 3] + * Grouping: x#1 + * InputSchema: x#1, y#2 + * }}} + * + * Result: + * {{{ + * First call to next(): ([a], Iterator([a, 1]) + * Second call to next(): ([b], Iterator([b, 2], [b, 3]) + * }}} + * + * Note, the class does not handle the case of an empty input for simplicity of implementation. + * Use the factory to construct a new instance. + * + * @param input An iterator of rows. This iterator must be ordered by the groupingExpressions or + * it is possible for the same group to appear more than once. + * @param groupingExpressions The set of expressions used to do grouping. The result of evaluating + *these expressions will be returned as the first part of each call + *to `next()`. + * @param inputSchema The schema of the rows in the `input` iterator. + */ +class GroupedIterator private( +input: Iterator[InternalRow], +groupingExpressions: Seq[Expression], +inputSchema: Seq[Attribute]) + extends Iterator[(InternalRow, Iterator[InternalRow])] { + + /** Compares two input rows and returns 0 if they are in the same group. */ + val sortOrder = groupingExpressions.map(SortOrder(_, Ascending)) + val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema) + + /** Creates a row containing only the key for a given input row. */ + val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema) + + /** + * Holds null or the row that will be returned on next call to `next()` in the inner iterator. + */ + var currentRow = input.next() + + /** Holds a copy of an input row that is in the current group. */ + var currentGroup = currentRow.copy() + var currentIterator: Iterator[InternalRow] = null + assert(keyOrdering.compare(currentGroup, currentRow) == 0) + + // Return true if we already have the next iterator or fetching a new iterator is successful. + def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator + + def next(): (InternalRow, Iterator[InternalRow]) = { +assert(hasNext) // Ensure we have fetched the next iterator. +val ret = (keyProjection(currentGroup), currentIterator) +currentIterator = null +ret + } + + def fetchNextGroupIterator(): Boolean = { +if (currentRow != null || input.hasNext) { + val inputIterator = new Iterator[InternalRow] { --- End diff -- yea, sure --- If
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r43113163 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression} + +object GroupedIterator { + def apply( + input: Iterator[InternalRow], + keyExpressions: Seq[Expression], + inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = { +if (input.hasNext) { + new GroupedIterator(input, keyExpressions, inputSchema) +} else { + Iterator.empty +} + } +} + +/** + * Iterates over a presorted set of rows, chunking it up by the grouping expression. Each call to + * next will return a pair containing the current group and an iterator that will return all the + * elements of that group. Iterators for each group are lazily constructed by extracting rows + * from the input iterator. As such, full groups are never materialized by this class. + * + * Example input: + * {{{ + * Input: [a, 1], [b, 2], [b, 3] + * Grouping: x#1 + * InputSchema: x#1, y#2 + * }}} + * + * Result: + * {{{ + * First call to next(): ([a], Iterator([a, 1]) + * Second call to next(): ([b], Iterator([b, 2], [b, 3]) + * }}} + * + * Note, the class does not handle the case of an empty input for simplicity of implementation. + * Use the factory to construct a new instance. + * + * @param input An iterator of rows. This iterator must be ordered by the groupingExpressions or + * it is possible for the same group to appear more than once. + * @param groupingExpressions The set of expressions used to do grouping. The result of evaluating + *these expressions will be returned as the first part of each call + *to `next()`. + * @param inputSchema The schema of the rows in the `input` iterator. + */ +class GroupedIterator private( +input: Iterator[InternalRow], +groupingExpressions: Seq[Expression], +inputSchema: Seq[Attribute]) + extends Iterator[(InternalRow, Iterator[InternalRow])] { + + /** Compares two input rows and returns 0 if they are in the same group. */ + val sortOrder = groupingExpressions.map(SortOrder(_, Ascending)) + val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema) + + /** Creates a row containing only the key for a given input row. */ + val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema) + + /** + * Holds null or the row that will be returned on next call to `next()` in the inner iterator. + */ + var currentRow = input.next() + + /** Holds a copy of an input row that is in the current group. */ + var currentGroup = currentRow.copy() + var currentIterator: Iterator[InternalRow] = null + assert(keyOrdering.compare(currentGroup, currentRow) == 0) + + // Return true if we already have the next iterator or fetching a new iterator is successful. + def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator + + def next(): (InternalRow, Iterator[InternalRow]) = { +assert(hasNext) // Ensure we have fetched the next iterator. +val ret = (keyProjection(currentGroup), currentIterator) +currentIterator = null +ret + } + + def fetchNextGroupIterator(): Boolean = { +if (currentRow != null || input.hasNext) { + val inputIterator = new Iterator[InternalRow] { --- End diff -- Thats a good catch.
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r43108962 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression} + +object GroupedIterator { + def apply( + input: Iterator[InternalRow], + keyExpressions: Seq[Expression], + inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = { +if (input.hasNext) { + new GroupedIterator(input, keyExpressions, inputSchema) +} else { + Iterator.empty +} + } +} + +/** + * Iterates over a presorted set of rows, chunking it up by the grouping expression. Each call to + * next will return a pair containing the current group and an iterator that will return all the + * elements of that group. Iterators for each group are lazily constructed by extracting rows + * from the input iterator. As such, full groups are never materialized by this class. + * + * Example input: + * {{{ + * Input: [a, 1], [b, 2], [b, 3] + * Grouping: x#1 + * InputSchema: x#1, y#2 + * }}} + * + * Result: + * {{{ + * First call to next(): ([a], Iterator([a, 1]) + * Second call to next(): ([b], Iterator([b, 2], [b, 3]) + * }}} + * + * Note, the class does not handle the case of an empty input for simplicity of implementation. + * Use the factory to construct a new instance. + * + * @param input An iterator of rows. This iterator must be ordered by the groupingExpressions or + * it is possible for the same group to appear more than once. + * @param groupingExpressions The set of expressions used to do grouping. The result of evaluating + *these expressions will be returned as the first part of each call + *to `next()`. + * @param inputSchema The schema of the rows in the `input` iterator. + */ +class GroupedIterator private( +input: Iterator[InternalRow], +groupingExpressions: Seq[Expression], +inputSchema: Seq[Attribute]) + extends Iterator[(InternalRow, Iterator[InternalRow])] { + + /** Compares two input rows and returns 0 if they are in the same group. */ + val sortOrder = groupingExpressions.map(SortOrder(_, Ascending)) + val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema) + + /** Creates a row containing only the key for a given input row. */ + val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema) + + /** + * Holds null or the row that will be returned on next call to `next()` in the inner iterator. + */ + var currentRow = input.next() + + /** Holds a copy of an input row that is in the current group. */ + var currentGroup = currentRow.copy() + var currentIterator: Iterator[InternalRow] = null + assert(keyOrdering.compare(currentGroup, currentRow) == 0) + + // Return true if we already have the next iterator or fetching a new iterator is successful. + def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator + + def next(): (InternalRow, Iterator[InternalRow]) = { +assert(hasNext) // Ensure we have fetched the next iterator. +val ret = (keyProjection(currentGroup), currentIterator) +currentIterator = null +ret + } + + def fetchNextGroupIterator(): Boolean = { +if (currentRow != null || input.hasNext) { + val inputIterator = new Iterator[InternalRow] { --- End diff -- This is dangerous,
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42977432 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression} + +object GroupedIterator { + def apply( + input: Iterator[InternalRow], + keyExpressions: Seq[Expression], + inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = { +if (input.hasNext) { + new GroupedIterator(input, keyExpressions, inputSchema) +} else { + Iterator.empty +} + } +} + +/** + * Iterates over a presorted set of rows, chunking it up by the grouping expression. Each call to + * next will return a pair containing the current group and an iterator that will return all the + * elements of that group. Iterators for each group are lazily constructed by extracting rows + * from the input iterator. As such, full groups are never materialized by this class. + * + * Example input: + * {{{ + * Input: [a, 1], [b, 2], [b, 3] + * Grouping: x#1 + * InputSchema: x#1, y#2 + * }}} + * + * Result: + * {{{ + * First call to next(): ([a], Iterator([a, 1]) + * Second call to next(): ([b], Iterator([b, 2], [b, 3]) + * }}} + * + * Note, the class does not handle the case of an empty input for simplicity of implementation. + * Use the factory to construct a new instance. + * + * @param input An iterator of rows. This iterator must be ordered by the groupingExpressions or + * it is possible for the same group to appear more than once. + * @param groupingExpressions The set of expressions used to do grouping. The result of evaluating + *these expressions will be returned as the first part of each call + *to `next()`. + * @param inputSchema The schema of the rows in the `input` iterator. + */ +class GroupedIterator private( +input: Iterator[InternalRow], +groupingExpressions: Seq[Expression], +inputSchema: Seq[Attribute]) + extends Iterator[(InternalRow, Iterator[InternalRow])] { + + /** Compares two input rows and returns 0 if they are in the same group. */ + val sortOrder = groupingExpressions.map(SortOrder(_, Ascending)) + val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema) + + /** Creates a row containing only the key for a given input row. */ + val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema) + + /** + * Holds null or the row that will be returned on next call to `next()` in the inner iterator. + */ + var currentRow = input.next() + + /** Holds a copy of an input row that is in the current group. */ + var currentGroup = currentRow.copy() + var currentIterator: Iterator[InternalRow] = null + assert(keyOrdering.compare(currentGroup, currentRow) == 0) + + // Return true if we already have the next iterator or fetching a new iterator is successful. + def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator + + def next(): (InternalRow, Iterator[InternalRow]) = { --- End diff -- What if the user call `next` twice? We will return different result and break the semantic of `Iterator` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42970431 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to the class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( --- End diff -- No, I was just trying to match the constructor of DataFrame, but we could probably simplify this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42970368 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.encoders + + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{StructField, StructType} + +// Most of this file is codegen. +// scalastyle:off + +/** + * A set of composite encoders that take sub encoders and map each of their objects to a + * Scala tuple. Note that currently the implementation is fairly limited and only supports going + * from an internal row to a tuple. + */ +object TupleEncoder { --- End diff -- Thats true, but the though was that we wanted some way to compose any encoder (i.e. one for Java POJOs) in a way that returns a tuple. However, I'm in the process of rewriting all of this to do it using expression composition instead of imperative code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42960084 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.encoders + + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{StructField, StructType} + +// Most of this file is codegen. +// scalastyle:off + +/** + * A set of composite encoders that take sub encoders and map each of their objects to a + * Scala tuple. Note that currently the implementation is fairly limited and only supports going + * from an internal row to a tuple. + */ +object TupleEncoder { --- End diff -- `Tuple` is an implementation of `Product`, isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42960493 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to the class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( --- End diff -- Will `sqlContext` and `queryExecution.sqlContext` different? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150304953 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42780810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150305783 **[Test build #44162 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/consoleFull)** for PR 9190 at commit [`e251f87`](https://github.com/apache/spark/commit/e251f87a830df9f40d523f00bbe14c174c91b2ae). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42778416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala --- @@ -46,13 +47,27 @@ trait Encoder[T] { /** * Returns an object of type `T`, extracting the required values from the provided row. Note that - * you must bind the encoder to a specific schema before you can call this function. + * you must `bind` an encoder to a specific schema before you can call this function. */ def fromRow(row: InternalRow): T /** * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the - * given schema + * given schema. */ def bind(schema: Seq[Attribute]): Encoder[T] --- End diff -- I agree that this is needs to be reworked. In particular we should separate resolution from binding (as mentioned in the PR description). The way we are doing it today allows us to do very efficient codegen (no extra copies) and correctly handles things like joins that produce ambiguous column names (since internally we are binding to AttributeReferences). Given limited time before 1.6 code freeze, I'd rather mark the Encoder API as private and focus on fleshing out the user facing API. I think that long term we'll do what you suggest and have a wrapper that reorders input for custom encoder, and stick with pure expressions for the built in encoders for performance reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150338653 **[Test build #44162 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/consoleFull)** for PR 9190 at commit [`e251f87`](https://github.com/apache/spark/commit/e251f87a830df9f40d523f00bbe14c174c91b2ae). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n * `case class StringEncoder(`\n * ` |class Tuple$`\n * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n * ` implicit class AttributeSeq(attrs: Seq[Attribute]) `\n * `case class MapPartitions[T, U](`\n * `case class AppendColum n[T, U](`\n * `case class MapGroups[K, T, U](`\n * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n * `abstract class SQLImplicits `\n * `case class MapPartitions[T, U](`\n * `case class AppendColumns[T, U](`\n * `case class MapGroups[K, T, U](`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150338785 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150338781 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42778185 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.language.postfixOps + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext + +case class ClassData(a: String, b: Int) + +class DatasetSuite extends QueryTest with SharedSQLContext { --- End diff -- Totally agree. There will be tests for all of the functions called from pure Java before 1.6 (we learned our lesson after introducing DataFrames). As I mentioned in the description though, I'd like to merge this infrastructure and then do the Java API in a follow up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150305742 Hey Guys, thanks for looking this over! I think I addressed most of your comments, but let me know if you want to talk about anything further. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42777859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150304918 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150371978 I'm going to merge this. We should continue to do review post-hoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/9190 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42776824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.encoders.Encoder +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution + +/** + * A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not + * construct a [[GroupedDataset]] directly, but should instead call `groupBy` on an existing + * [[Dataset]]. + */ +class GroupedDataset[K, T] private[sql]( +private val kEncoder: Encoder[K], +private val tEncoder: Encoder[T], +queryExecution: QueryExecution, +private val dataAttributes: Seq[Attribute], +private val groupingAttributes: Seq[Attribute]) extends Serializable { + + private implicit def kEnc = kEncoder + private implicit def tEnc = tEncoder + private def logicalPlan = queryExecution.analyzed + private def sqlContext = queryExecution.sqlContext + + /** + * Returns a [[Dataset]] that contains each unique key. + */ + def keys: Dataset[K] = { --- End diff -- We don't *need* it, but its virtually free to implement efficiently, additionally: - [Scalding has it](https://github.com/twitter/scalding/wiki/Type-safe-api-reference#creating-groups-and-joining-cogrouping) - [PairRDD has it](https://spark.apache.org/docs/0.7.2/api/core/spark/PairRDDFunctions.html) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150014839 **[Test build #44087 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/consoleFull)** for PR 9190 at commit [`b172d78`](https://github.com/apache/spark/commit/b172d789e1deee51c43fa35835d4a64736d4859e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150013834 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150013863 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150044350 **[Test build #44087 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/consoleFull)** for PR 9190 at commit [`b172d78`](https://github.com/apache/spark/commit/b172d789e1deee51c43fa35835d4a64736d4859e). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n * `case class StringEncoder(`\n * ` |class Tuple$`\n * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n * ` implicit class AttributeSeq(attrs: Seq[Attribute]) `\n * `case class MapPartitions[T, U](`\n * `case class AppendColum n[T, U](`\n * `case class MapGroups[K, T, U](`\n * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n * `case class MapPartitions[T, U](`\n * `case class AppendColumns[T, U](`\n * `case class MapGroups[K, T, U](`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150044716 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-150044715 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42700923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -36,6 +38,10 @@ private[sql] object Column { def unapply(col: Column): Option[Expression] = Some(col.expr) } +/** + * A [[Column]] where an [[Encoder]] has been given for the expected return type. + */ +class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr) --- End diff -- add since version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42701186 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -387,6 +389,10 @@ class SQLContext private[sql]( def $(args: Any*): ColumnName = { new ColumnName(sc.s(args: _*)) } + + def e[T : Encoder](args: Any*): TypedColumn[T] = { --- End diff -- what is this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42701182 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -21,6 +21,8 @@ import java.beans.{BeanInfo, Introspector} import java.util.Properties import java.util.concurrent.atomic.AtomicReference +import org.apache.spark.sql.catalyst.encoders.Encoder --- End diff -- import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42701621 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.encoders + + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{StructField, StructType} + +// Most of this file is codegen. +// scalastyle:off + +/** + * A set of composite encoders that take sub encoders and map each of their objects to a + * Scala tuple. Note that currently the implementation is fairly limited and only supports going + * from an internal row to a tuple. + * + * The input is assumed to be --- End diff -- this seems incomplete? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42701532 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.encoders.Encoder --- End diff -- remove? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42701821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -33,6 +37,16 @@ import org.apache.spark.unsafe.types.UTF8String private[sql] abstract class SQLImplicits { --- End diff -- not this pr, but SQLImplicits should be public --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702153 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.encoders.Encoder --- End diff -- import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702840 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.language.postfixOps + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext + +case class ClassData(a: String, b: Int) + +class DatasetSuite extends QueryTest with SharedSQLContext { --- End diff -- Can you also add a test suite in Java? It's useful to have Java code that calls each method in case we accidentally break some of them later. Even if it's doing the same tests, just make it a .java file and make sure we have a call to each method in Java. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702889 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala --- @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType * and reuse internal buffers to improve performance. */ trait Encoder[T] { --- End diff -- Is this a DeveloperApi for now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702955 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702965 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.types.StructType + +/** + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. + * + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + *in the encoded form. This representation allows for additional logical operations and + *enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + *an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + *used to serialize the object into a binary format. Encoders are also capable of mapping the + *schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + *reflection based serialization. Operations that change the type of object stored in the + *dataset also need an encoder for the new type. + * + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into + * specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`. + * + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`. However, + * making this change to che class hierarchy would break the function signatures for the existing + * functional operations (map, flatMap, etc). As such, this class should be considered a preview + * of the final API. Changes will be made to the interface after Spark 1.6. + * + * @since 1.6.0 + */ +@Experimental +class Dataset[T] private[sql]( +@transient val sqlContext: SQLContext, +@transient val queryExecution: QueryExecution)( +implicit val encoder: Encoder[T]) extends Serializable { + + private implicit def classTag = encoder.clsTag + + private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) = +this(sqlContext, new QueryExecution(sqlContext, plan)) + + /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */ + def schema: StructType = encoder.schema + + /* * * + * Conversions * + * * */ + + /** + * Returns a new `Dataset` where each record has been mapped on to the specified type. + * TODO: should bind here... + * TODO: document binding rules + * @since 1.6.0 + */ + def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]]) + + /** + * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have + * the same name after two Datasets have been joined. + */ + def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _)) + + /** + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + */ + def toDF():
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42702980 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala --- @@ -46,13 +47,27 @@ trait Encoder[T] { /** * Returns an object of type `T`, extracting the required values from the provided row. Note that - * you must bind the encoder to a specific schema before you can call this function. + * you must `bind` an encoder to a specific schema before you can call this function. */ def fromRow(row: InternalRow): T /** * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the - * given schema + * given schema. */ def bind(schema: Seq[Attribute]): Encoder[T] --- End diff -- It would be nice if these were separate from the process of actually encoding stuff. Otherwise users that want to make custom encoders will have to do lots of work. It's not super clear at a glance what each of these APIs are for and when each will recalled (i.e. bind vs bindOrdinals vs rebind). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42703300 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala --- @@ -46,13 +47,27 @@ trait Encoder[T] { /** * Returns an object of type `T`, extracting the required values from the provided row. Note that - * you must bind the encoder to a specific schema before you can call this function. + * you must `bind` an encoder to a specific schema before you can call this function. */ def fromRow(row: InternalRow): T /** * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the - * given schema + * given schema. */ def bind(schema: Seq[Attribute]): Encoder[T] --- End diff -- To simplify it, maybe we can just use "schema" to figure out the order of field names this Encoder expects, and internally project the rows we pass to it so that they're in that order. It might be somewhat less efficient though, I guess, but it would be nice if this API was closer to being open-able because some people might like to play with it in 1.6. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42703289 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.encoders.Encoder +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.QueryExecution + +/** + * A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not + * construct a [[GroupedDataset]] directly, but should instead call `groupBy` on an existing + * [[Dataset]]. + */ +class GroupedDataset[K, T] private[sql]( +private val kEncoder: Encoder[K], +private val tEncoder: Encoder[T], +queryExecution: QueryExecution, +private val dataAttributes: Seq[Attribute], +private val groupingAttributes: Seq[Attribute]) extends Serializable { + + private implicit def kEnc = kEncoder + private implicit def tEnc = tEncoder + private def logicalPlan = queryExecution.analyzed + private def sqlContext = queryExecution.sqlContext + + /** + * Returns a [[Dataset]] that contains each unique key. + */ + def keys: Dataset[K] = { --- End diff -- why do we need this? since it is just map(_...).distinct --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149828370 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149828393 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-14982 **[Test build #44056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/consoleFull)** for PR 9190 at commit [`d6ac1f8`](https://github.com/apache/spark/commit/d6ac1f81fdbab36fc64aaae47203dba07334df14). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149859719 **[Test build #44056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/consoleFull)** for PR 9190 at commit [`d6ac1f8`](https://github.com/apache/spark/commit/d6ac1f81fdbab36fc64aaae47203dba07334df14). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n * `case class StringEncoder(`\n * ` |class Tuple$`\n * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n * ` implicit class AttributeSeq(attrs: Seq[Attribute]) `\n * `case class MapPartitions[T, U](`\n * `case class AppendColum n[T, U](`\n * `case class MapGroups[K, T, U](`\n * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n * `case class MapPartitions[T, U](`\n * `case class AppendColumns[T, U](`\n * `case class MapGroups[K, T, U](`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149859824 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149859825 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149752240 **[Test build #44026 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/consoleFull)** for PR 9190 at commit [`e32885f`](https://github.com/apache/spark/commit/e32885f50462c3c66ae049c7cf0d69e90ecbedbe). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149750983 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149750967 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/9190#discussion_r42575073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -36,6 +36,10 @@ private[sql] object Column { def unapply(col: Column): Option[Expression] = Some(col.expr) } +/** + * A [[Column]] where a type hint has been given for the expected return type. + */ +class TypedColumn[T](expr: Expression) extends Column(expr) --- End diff -- should we put the encoder here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
GitHub user marmbrus opened a pull request: https://github.com/apache/spark/pull/9190 [SPARK-6] [SQL] First Draft of Dataset API *This PR adds a new experimental API to Spark, tentitively named Datasets.* A `Dataset` is a strongly typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows: ### Functional ```scala > val ds: Dataset[Int] = Seq(1, 2, 3).toDS() > ds.filter(_ % 1 == 0).collect() res1: Array[Int] = Array(1, 2, 3) ``` ### Relational ```scala scala> ds.toDF().show() +-+ |value| +-+ |1| |2| |3| +-+ > ds.select(expr("value + 1").as[Int]).collect() res11: Array[Int] = Array(2, 3, 4) ``` ## Comparison to RDDs A `Dataset` differs from an `RDD` in the following ways: - The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be used to serialize the object into a binary format. Encoders are also capable of mapping the schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime reflection based serialization. - Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored in the encoded form. This representation allows for additional logical operations and enables many operations (sorting, shuffling, etc.) to be performed without deserializing to an object. A `Dataset` can be converted to an `RDD` by calling the `.rdd` method. ## Comparison to DataFrames A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed `Dataset` to a generic DataFrame by calling `ds.toDF()`. ## Implementation Status and TODOs This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs: - Joins and Aggregations (prototype here https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937) - Support for Java Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames). ## COMPATIBILITY NOTE Long term we plan to make `DataFrame` extend `Dataset[Row]`. However, making this change to che class hierarchy would break the function signatures for the existing function operations (map, flatMap, etc). As such, this class should be considered a preview of the final API. Changes will be made to the interface after Spark 1.6. You can merge this pull request into a Git repository by running: $ git pull https://github.com/marmbrus/spark dataset-infra Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9190 commit 0db295c3491b2344213647c91b7ecae2763c58ac Author: Michael ArmbrustDate: 2015-10-20T21:18:53Z [SPARK-6] [SQL] First Draft of Dataset API commit f11f91e6f08c8cf389b8388b626cd29eec32d937 Author: Michael Armbrust Date: 2015-10-20T21:38:58Z delete controversial bits commit e32885f50462c3c66ae049c7cf0d69e90ecbedbe Author: Michael Armbrust Date: 2015-10-20T23:02:28Z style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149770955 **[Test build #44026 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/consoleFull)** for PR 9190 at commit [`e32885f`](https://github.com/apache/spark/commit/e32885f50462c3c66ae049c7cf0d69e90ecbedbe). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n * `case class StringEncoder(`\n * ` |class Tuple$`\n * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n * ` implicit class AttributeSeq(attrs: Seq[Attribute]) `\n * `case class MapPartitions[T, U](`\n * `case class AppendColum n[T, U](`\n * `case class MapGroups[K, T, U](`\n * `class TypedColumn[T](expr: Expression) extends Column(expr)`\n * `case class MapPartitions[T, U](`\n * `case class AppendColumns[T, U](`\n * `case class MapGroups[K, T, U](`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149771016 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9190#issuecomment-149771015 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org