[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S + + def update(newState: S): Unit + + def remove(): Unit --- End diff -- Scala doc for these, even though its pretty obvious. In particular, I assume its safe to call update() get remove() more than once in the function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779663 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? + + override protected def doExecute(): RDD[InternalRow] = { + --- End diff -- nit: extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98775275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next --- End diff -- Any updates to the state will be stored and passed to the user given function in subsequent batches when executed as a Streaming Query. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778548 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution --- End diff -- Should this be in `streaming`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? + + override protected def doExecute(): RDD[InternalRow] = { + +child.execute().mapPartitionsWithStateStore[InternalRow]( --- End diff -- I don't think you need the type here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777503 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and --- End diff -- I think we want to say what state we are talking about. Something like "per-key state from previous invocations of the function in a StreamingQuery" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98780383 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -144,6 +145,12 @@ object ObjectOperator { (i: InternalRow) => proj(i).get(0, deserializer.dataType) } + def deserializeRowToObject( +deserializer: Expression): InternalRow => Any = { --- End diff -- indent, also does this not fit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? --- End diff -- Yes, the GroupedIterator relies on sorting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -90,6 +93,14 @@ class IncrementalExecution( keys, Some(stateId), child) :: Nil)) + case MapGroupsWithStateExec( +func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, --- End diff -- indent inconsistent with above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98775825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. --- End diff -- while maintaining some user-defined state for each key. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98780164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -184,7 +189,7 @@ case class StateStoreSaveExec( } // Assumption: Append mode can be done only when watermark has been specified -store.remove(watermarkPredicate.get.eval) +store.remove(watermarkPredicate.get.eval _) --- End diff -- Why this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { + private var value: S = optionalValue.getOrElse(null.asInstanceOf[S]) + private var defined: Boolean = optionalValue.isDefined + private var updated: Boolean = false // whether value has been updated (but not removed) + private var removed: Boolean = false // whether value has eben removed + + // = Public API = + override def exists: Boolean = { +defined + } + + override def get(): S = { --- End diff -- no `()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779142 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan { } } +trait StateStoreReader extends StatefulOperator { --- End diff -- and lowercase if it contains multiple classes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S --- End diff -- No `()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan { } } +trait StateStoreReader extends StatefulOperator { + override lazy val metrics = Map( +"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) +} + +trait StateStoreWriter extends StatefulOperator { --- End diff -- Scala doc for both of these. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { --- End diff -- `KeyState`? State just feels very generic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778773 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { --- End diff -- I would consider using `null` here to avoid extra allocations in the critical path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776628 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( --- End diff -- I think it would be useful to come up with an example use case since this is pretty complicated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( + func: (K, Iterator[V], State[STATE]) => OUT): Dataset[OUT] = { +val f = (key: K, it: Iterator[V], s: State[STATE]) => Iterator(func(key, it, s)) +flatMapGroupsWithState[STATE, OUT](f) + } + + /** + * ::Experimental:: + * (Java-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE, OUT]( --- End diff -- Since this is pretty complicated and we might iterate over time, I wonder if we shouldn't just put the full explanation in `State` and link there with a very brief version for each function. Not sure... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. --- End diff -- I'd maybe put these into bullets as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778649 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { + private var value: S = optionalValue.getOrElse(null.asInstanceOf[S]) + private var defined: Boolean = optionalValue.isDefined + private var updated: Boolean = false // whether value has been updated (but not removed) + private var removed: Boolean = false // whether value has eben removed --- End diff -- "been" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776799 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( --- End diff -- I would also stick with `S` and `U` to match other functions in the class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777806 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state --- End diff -- A user defined type that can be stored for each key. Must be encodable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S + + def update(newState: S): Unit + + def remove(): Unit + + @inline final def getOption(): Option[S] = if (exists) Some(get()) else None --- End diff -- No `()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16745: [SPARK-19406] [SQL] Fix function to_json to respect user...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16745 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16329 Sorry for the delay. This LGTM, but I'm currently away from my Apache SSH keys. Other committers should feel free to merge if you get there before I do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16664: [SPARK-18120 ][SQL] Call QueryExecutionListener callback...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16664 /cc @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16564: [SPARK-19065][SS]Rewrite Alias in StreamExecution if nec...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16564 Hmm, I'm not sure that I agree with the solution from #15427. I do not think that it should be valid to have to different expressions that have the same expression id. There are many case where we break the `df("col")` syntax by adding new operations, and I don't think it is worth that hack to preserve this particular case with drop duplicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16553: [SPARK-9435][SQL] Reuse function in Java UDF to c...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16553#discussion_r95655634 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala --- @@ -488,219 +488,241 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * @since 1.3.0 */ def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = { +val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any) --- End diff -- There is commented out code above thats used to generate these functions. We should update it or delete it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16240#discussion_r94888245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -99,33 +96,96 @@ abstract class SQLImplicits { // Seqs - /** @since 1.6.1 */ - implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() + /** + * @since 1.6.1 + * @deprecated use [[newIntSequenceEncoder]] + */ + def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() --- End diff -- Wait, I'm not sure I agree... Do we want to break binary compatibility for libraries that might be using this function? That could have even been resolved implicitly, so it would be confusing when it breaks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16240 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16240 For future reference: https://github.com/apache/spark/blob/master/dev/mima (script to run mima) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16371: [SPARK-18932][SQL] Support partial aggregation for colle...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16371 +1 I think we can move forward. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16322: [SPARK-18908][SS] Creating StreamingQueryException shoul...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16322 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16360#discussion_r93508333 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -219,7 +221,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - + outputMode match { +case Append | Complete => // allowed +case Update => + throw new AnalysisException("Update ouptut mode is not supported for memory format") --- End diff -- "memory sink" for consistency with the above error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16360#discussion_r93507397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalOutputModes.scala --- @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.catalyst --- End diff -- maybe in catalyst.streaming or something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16360#discussion_r93509028 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -219,7 +221,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - + outputMode match { +case Append | Complete => // allowed +case Update => + throw new AnalysisException("Update ouptut mode is not supported for memory format") --- End diff -- also "output". Thinking a little more, I wonder if it would be better to say what is supported? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16304#discussion_r93355210 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delay.milliseconds +math.max(0, e.eventTimeStats.value.max - e.delayMs) --- End diff -- Im not totally sure, but it does seem likely that we don't handle negative dates (and that is probably okay). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16322: [SPARK-18908][SS] Creating StreamingQueryExceptio...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16322#discussion_r93121938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -206,6 +201,36 @@ class StreamExecution( startLatch.await() // Wait until thread started and QueryStart event has been posted } + private def generateLogicalPlan: LogicalPlan = { +var nextSourceId = 0L +val internalLogicalPlan = analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} +sources = internalLogicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +internalLogicalPlan + } + + override def logicalPlan: LogicalPlan = { +if (_logicalPlan == null) { + localPlanLock.synchronized { +if (_logicalPlan == null) { + _logicalPlan = generateLogicalPlan +} + } +} +_logicalPlan + } --- End diff -- or really what i was suggesting was to have a separate try/catch early in the stream thread that forces initialization of anything that is lazy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16329 This is great! Thanks for taking the time to write up such complete examples. I think this was a big gap in the existing docs. One other ask. The screen-shot is great, but I'd like to see which parts actually make it into the code snipits in the doc. Ideally you could post a link to compiled doc. If thats hard I can also try to build locally though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93119785 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:typed_custom_aggregation$ +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.SparkSession +// $example off:typed_custom_aggregation$ + +object UserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + case class Salary(person: String, salary: Long) + case class Average(sum: Long, count: Long) + + object MyAverage extends Aggregator[Salary, Average, Double] { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +def zero: Average = Average(0L, 0L) +// Combine two values to produce a new value. For performance, the function may modify `b` and --- End diff -- Same comment here with object reuse. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93121147 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:typed_custom_aggregation$ +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.SparkSession +// $example off:typed_custom_aggregation$ + +object UserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + case class Salary(person: String, salary: Long) + case class Average(sum: Long, count: Long) + + object MyAverage extends Aggregator[Salary, Average, Double] { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +def zero: Average = Average(0L, 0L) +// Combine two values to produce a new value. For performance, the function may modify `b` and +// return it instead of constructing a new object for b +def reduce(b: Average, a: Salary): Average = Average(b.sum + a.salary, b.count + 1) +// Merge two intermediate values +def merge(b1: Average, b2: Average): Average = Average(b1.sum + b2.sum, b1.count + b2.count) +// Transform the output of the reduction +def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count +// Specifies the Encoder for the intermediate value type +def bufferEncoder: Encoder[Average] = Encoders.product +// Specifies the Encoder for the final output value type +def outputEncoder: Encoder[Double] = Encoders.scalaDouble + } + // $example off:typed_custom_aggregation$ + + def main(args: Array[String]): Unit = { +val spark = SparkSession + .builder() + .appName("Spark SQL user-defined Datasets aggregation example") + .getOrCreate() + +import spark.implicits._ + +// $example on:typed_custom_aggregation$ +val ds = spark.read.json("examples/src/main/resources/salaries.json").as[Salary] +ds.show() +// +---+--+ +// | person|salary| +// +---+--+ +// |Michael| 3000| +// | Andy| 4500| +// | Justin| 3500| +// | Berta| 4000| +// +---+--+ + +val averageSalary = MyAverage.toColumn.name("average_salary") --- End diff -- Maybe comment what `name` is doing here. I actually had to look it up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93118905 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:typed_custom_aggregation$ +import java.io.Serializable; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.expressions.Aggregator; +// $example off:typed_custom_aggregation$ + +public class JavaUserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + public static class Salary implements Serializable { +private String person; +private long salary; + +// Constructors, getters, setters... +// $example off:typed_custom_aggregation$ +public String getPerson() { + return person; +} + +public void setPerson(String person) { + this.person = person; +} + +public long getSalary() { + return salary; +} + +public void setSalary(long salary) { + this.salary = salary; +} +// $example on:typed_custom_aggregation$ + } + + public static class Average implements Serializable { +private long sum; +private long count; + +// Constructors, getters, setters... +// $example off:typed_custom_aggregation$ +public Average() { +} + +public Average(long sum, long count) { + this.sum = sum; + this.count = count; +} + +public long getSum() { + return sum; +} + +public void setSum(long sum) { + this.sum = sum; +} + +public long getCount() { + return count; +} + +public void setCount(long count) { + this.count = count; +} +// $example on:typed_custom_aggregation$ + } + + public static class MyAverage extends Aggregator { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +public Average zero() { + return new Average(0L, 0L); +} +// Combine two values to produce a new value. For performance, the function may modify `b` and --- End diff -- Its a little confusing to have the comment here for this optimization, but then not implement it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93118975 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:typed_custom_aggregation$ +import java.io.Serializable; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.expressions.Aggregator; +// $example off:typed_custom_aggregation$ + +public class JavaUserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + public static class Salary implements Serializable { --- End diff -- I might be a little clearer if this was a `Person` with a `name` and `salary`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16240 /cc @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16322: [SPARK-18908][SS] Creating StreamingQueryExceptio...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16322#discussion_r93113059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -206,6 +201,36 @@ class StreamExecution( startLatch.await() // Wait until thread started and QueryStart event has been posted } + private def generateLogicalPlan: LogicalPlan = { +var nextSourceId = 0L +val internalLogicalPlan = analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} +sources = internalLogicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +internalLogicalPlan + } + + override def logicalPlan: LogicalPlan = { +if (_logicalPlan == null) { + localPlanLock.synchronized { +if (_logicalPlan == null) { + _logicalPlan = generateLogicalPlan +} + } +} +_logicalPlan + } --- End diff -- This is getting pretty complicated... Do we really need to include all of this information in `StreamingQueryException`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16304: [SPARK-18894][SS] Fix event time watermark delay thresho...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16304 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16304#discussion_r92904102 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -124,6 +137,29 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { ) } + test("delay in years handled correctly") { +val input = MemoryStream[Long] +val aggWithWatermark = input.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "1 month") --- End diff -- Just wanted to make sure I wasn't missing something :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16304#discussion_r92902755 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -124,6 +137,29 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { ) } + test("delay in years handled correctly") { +val input = MemoryStream[Long] +val aggWithWatermark = input.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "1 month") --- End diff -- This seems to be testing months, but the title says years? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16304: [SPARK-18894][SS] Disallow went time watermark de...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16304#discussion_r92753969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -572,6 +572,10 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +// Threshold specified in months/years is non-deterministic --- End diff -- what does "safe" mean in this context? users must not rely on watermarks for correctness as they can be arbitrarily delayed based on batch boundaries. I think this error actually confuses the point as its is enforcing precision when this API cannot provide that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16304: [SPARK-18894][SS] Disallow went time watermark de...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16304#discussion_r92753590 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -572,6 +572,10 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +// Threshold specified in months/years is non-deterministic --- End diff -- I think waiting 1 month for late data is a reasonable use case. Based on the definition of the watermark, its actually okay for us to over estimate this delay too. Why not take take the max (31 days, leap year)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16289: [SPARK-18870] Disallowed Distinct Aggregations on Stream...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16289 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16258: [SPARK-18834][SS] Expose event time stats through Stream...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16258 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16258#discussion_r92077840 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -33,27 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils /** * :: Experimental :: - * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. --- End diff -- Thats actually the opposite of how most code in SQL is laid out, so I think it would be better to avoid this change. The logic here is declarations that are use later should come first (references before declaration make it harder to read), and stuff at the end of the file is kind of hidden. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16258#discussion_r92073271 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala --- @@ -38,13 +38,18 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress1.id.toString}", | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", -| "timestamp" : "2016-12-05T20:54:20.827Z", +| "triggerTimestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { |"total" : 0 | }, -| "currentWatermark" : 3, +| "queryTimestamps" : { +|"eventTime.avg" : "2016-12-05T20:54:20.827Z", --- End diff -- It is available, it is stored in a human readable format in the offset log (BTW, is that a long or a timestamp?). I think in the log run we'll want to open up another API that gives you access to this log, but I think that can come later. For now, its still pretty easy to find. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16258#discussion_r92071196 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -360,6 +360,24 @@ class StreamExecution( if (hasNewData) { // Current batch timestamp in milliseconds offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() + // Update the eventTime watermark if we find one in the plan. --- End diff -- +1, this makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16238: [SPARK-18811] StreamSource resolution should happ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16238#discussion_r91803533 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import org.apache.spark.sql.{SQLContext, _} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ +class DefaultSource extends StreamSourceProvider with StreamSinkProvider { --- End diff -- No, you can give fully qualified classnames as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16238: [SPARK-18811] StreamSource resolution should happen in s...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16238 This LGTM, I was just talking with @tdas about how I think that all of this initialization stuff should be lazy and happen on the stream execution thread. I think this can simplify what they are trying to fix in #16220 as well. /cc @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16238: [SPARK-18811] StreamSource resolution should happ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16238#discussion_r91803089 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import org.apache.spark.sql.{SQLContext, _} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ +class DefaultSource extends StreamSourceProvider with StreamSinkProvider { --- End diff -- Nit, I'd consider calling this `BlockingSource` or something more descriptive of what makes it special. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16182: [SPARK-18754][SS] Rename recentProgresses to recentProgr...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16182 /cc @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16182: [SPARK-18754][SS] Rename recentProgresses to rece...
GitHub user marmbrus opened a pull request: https://github.com/apache/spark/pull/16182 [SPARK-18754][SS] Rename recentProgresses to recentProgress Based on an informal survey, users find this option easier to understand / remember. You can merge this pull request into a Git repository by running: $ git pull https://github.com/marmbrus/spark renameRecentProgress Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16182 commit b3813ecd989a00db52de2486da45b13bf479ff04 Author: Michael Armbrust Date: 2016-12-07T01:19:16Z [SPARK-18754][SQL] Rename recentProgresses to recentProgress commit 7c125680ca66785742daee29fbd2977048a2503a Author: Michael Armbrust Date: 2016-12-07T01:23:19Z update config option too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16178: [SPARK-18751][Core]Fix deadlock when SparkContext...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16178#discussion_r91190778 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1760,25 +1760,24 @@ class SparkContext(config: SparkConf) extends Logging { def listJars(): Seq[String] = addedJars.keySet.toSeq /** - * Shut down the SparkContext. + * When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark + * may wait for some internal threads to finish. It's better to use this method to stop + * SparkContext instead. */ - def stop(): Unit = { -if (env.rpcEnv.isInRPCThread) { - // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread. - // We should launch a new thread to call `stop` to avoid dead-lock. - new Thread("stop-spark-context") { -setDaemon(true) + private[spark] def stopInNewThread(): Unit = { +new Thread("stop-spark-context") { + setDaemon(true) -override def run(): Unit = { - _stop() -} - }.start() -} else { - _stop() -} + override def run(): Unit = { +SparkContext.this.stop() --- End diff -- Will this ever throw an exception? Should we register an `UncaughtExceptionHandler` or try catch with logging? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16138: [WIP][SPARK-16609] Add to_date/to_timestamp with format ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16138 This will be very convenient! Looking forward to the whole patch. For SQL I think you should look at [`RuntimeReplaceable`](https://github.com/apache/spark/blob/fd90541c35af2bccf0155467bec8cea7c8865046/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L238). Also, we might consider having a default version that doen't take a pattern (and parses something standard such as `2016-12-01T02:03:01Z`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16113#discussion_r90741307 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -605,34 +629,64 @@ class StreamExecution( case object TERMINATED extends State } + /** - * Contains metadata associated with a stream execution. This information is - * persisted to the offset log via the OffsetSeq metadata field. Current - * information contained in this object includes: + * Contains metadata associated with a [[StreamingQuery]]. This information is written + * in the checkpoint location the first time a query is started and recovered every time the query + * is restarted. * - * @param batchWatermarkMs: The current eventTime watermark, used to - * bound the lateness of data that will processed. Time unit: milliseconds - * @param batchTimestampMs: The current batch processing timestamp. - * Time unit: milliseconds + * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts */ -case class StreamExecutionMetadata( -var batchWatermarkMs: Long = 0, -var batchTimestampMs: Long = 0) { - private implicit val formats = StreamExecutionMetadata.formats - - /** - * JSON string representation of this object. - */ - def json: String = Serialization.write(this) +case class StreamMetadata(id: String) { --- End diff -- Can we move these to their own file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16113#discussion_r90741239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala --- @@ -32,21 +32,33 @@ import org.apache.spark.sql.SparkSession trait StreamingQuery { /** - * Returns the name of the query. This name is unique across all active queries. This can be - * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as - * `dataframe.writeStream.queryName("query").start()`. + * Returns the user-specified name of the query, or null if not specified. + * This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` + * as `dataframe.writeStream.queryName("query").start()`. + * This name, if set, must be unique across all active queries. * * @since 2.0.0 */ def name: String /** - * Returns the unique id of this query. + * Returns the unique id of this query that persists across restarts from checkpoint data. + * That is, this id is generated when a query is started for the first time, and + * will be the same every time it is restarted from checkpoint data. + * There can only be one query with the same id active in a Spark cluster. --- End diff -- This makes it sound like its okay to have more than one running as long as they aren't on the same spark cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16113#discussion_r90741067 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala --- @@ -54,6 +61,26 @@ object OffsetSeq { * `nulls` in the sequence are converted to `None`s. */ def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = { -OffsetSeq(offsets.map(Option(_)), metadata) +OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply)) } } + + +/** + * Contains metadata associated with a [[OffsetSeq]]. This information is + * persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field. + * + * @param batchWatermarkMs: The current eventTime watermark, used to + * bound the lateness of data that will processed. Time unit: milliseconds + * @param batchTimestampMs: The current batch processing timestamp. + * Time unit: milliseconds + */ +case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { --- End diff -- Can we put this in its own file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15918 I don't think you can limit the implicit. What type would pick up case classes, but not case classes that contain invalid things? I think you would need a macros for this kind of introspection. (I'd be happy to be proven wrong with a PR.) I'd recommend you only import the implicits you need rather than using the wildcard. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16094: [SPARK-18541][Python]Add metadata parameter to pyspark.s...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16094 No worries, thanks for working on this! It's great to ensure our Python APIs aren't lagging behind the Scala ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13147: [SPARK-6320][SQL] Move planLater method into Gene...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13147#discussion_r90560927 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala --- @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } --- End diff -- Fine to include it there! I guess I need to review that :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16094: [SPARK-18541][Python]Add metadata parameter to pyspark.s...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/16094 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15918 We should probably add a flag (maybe even off by default). The error message can tell you to turn on the flag if you are okay with the fallback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15918 I agree that the only change in behavior is that things that used to throw an error will now not throw an error. If done right (I haven't looked deeply at the PR itself yet), no case that is currently working should change. It is maybe slightly odd to mix serialization types, but thats kind of already happening today if you use the `kryo` serializer. You are taking kryo encoded data and putting it as a binary value into a tungsten row. The change here makes it possible to do the same in cases where the incompatible object is nested within a compatible object. Currently you are forced into all or nothing (i.e. even if only a single field is incompatible you must treat the whole object as an opaque binary blob). The one possible concern compatibility concern I can see is, if in the future we add support for an previously unsupported type, the schema will change from `BinaryType` to something else. However, given there are very few operations you can do on Binary, and this format is not persisted or guaranteed to be compatible across Spark versions, this actually seems okay. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13147: [SPARK-6320][SQL] Move planLater method into Gene...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13147#discussion_r90317826 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala --- @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } --- End diff -- I just noticed, there is a TODO below in the comment that I think we can remove now :) > * TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER... PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[1/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming
Repository: spark Updated Branches: refs/heads/master 9a02f6821 -> c3d08e2f2 http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala new file mode 100644 index 000..7129fa4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( +val numRowsTotal: Long, +val numRowsUpdated: Long) { + private[sql] def jsonValue: JValue = { +("numRowsTotal" -> JInt(numRowsTotal)) ~ +("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} + +/** + * :: Experimental :: + * Information about progress made in the execution of a [[StreamingQuery]] during + * a trigger. Each event relates to processing done for a single trigger of the streaming + * query. Events are emitted even when no new data is available to be processed. + * + * @param id A unique id of the query. + * @param name Name of the query. This name is unique across all active queries. + * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param batchId A unique id for the current batch of data being processed. Note that in the + *case of retries after a failure a given batchId my be executed more than once. + *Similarly, when there is no data to be processed, the batchId will not be + *incremented. + * @param durationMs The amount of time taken to perform various operations in milliseconds. + * @param currentWatermark The current event time watermark in milliseconds + * @param stateOperators Information about operators in the query that store state. + * @param sources detailed statistics on data being read from each of the streaming sources. + * @since 2.1.0 + */ +@Experimental +class StreamingQueryProgress private[sql]( + val id: UUID, + val name: String, + val timestamp: Long, + val batchId: Long, + val durationMs: ju.Map[String, java.lang.Long], + val currentWatermark: Long, + val stateOperators: Array[StateOperatorProgress], + val sources: Array[SourceProgress], + val sink: SinkProgress) { + + /** The aggregate (across all sources) number of records processed in a trigger. */ + def numInputRows: Long = sources.map(_.numInputRows).sum + + /** The aggregate (across all sources) rate of data arriving. */ + def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum + + /** The aggregate (across all sources) rate at which Spark is processing data. */ + def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { +def safeDoubleToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing else JDouble(value) +} + +("id" -> JString(id.toString)) ~ +("name" -> JString(name)) ~ +("timestamp" -> JInt(timestamp)) ~ +("numInputRows" -> JInt(numInputRows)) ~ +("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ +("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ +("durationMs" -> durationMs
[1/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming
Repository: spark Updated Branches: refs/heads/branch-2.1 045ae299c -> 28b57c8a1 http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala new file mode 100644 index 000..7129fa4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( +val numRowsTotal: Long, +val numRowsUpdated: Long) { + private[sql] def jsonValue: JValue = { +("numRowsTotal" -> JInt(numRowsTotal)) ~ +("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} + +/** + * :: Experimental :: + * Information about progress made in the execution of a [[StreamingQuery]] during + * a trigger. Each event relates to processing done for a single trigger of the streaming + * query. Events are emitted even when no new data is available to be processed. + * + * @param id A unique id of the query. + * @param name Name of the query. This name is unique across all active queries. + * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param batchId A unique id for the current batch of data being processed. Note that in the + *case of retries after a failure a given batchId my be executed more than once. + *Similarly, when there is no data to be processed, the batchId will not be + *incremented. + * @param durationMs The amount of time taken to perform various operations in milliseconds. + * @param currentWatermark The current event time watermark in milliseconds + * @param stateOperators Information about operators in the query that store state. + * @param sources detailed statistics on data being read from each of the streaming sources. + * @since 2.1.0 + */ +@Experimental +class StreamingQueryProgress private[sql]( + val id: UUID, + val name: String, + val timestamp: Long, + val batchId: Long, + val durationMs: ju.Map[String, java.lang.Long], + val currentWatermark: Long, + val stateOperators: Array[StateOperatorProgress], + val sources: Array[SourceProgress], + val sink: SinkProgress) { + + /** The aggregate (across all sources) number of records processed in a trigger. */ + def numInputRows: Long = sources.map(_.numInputRows).sum + + /** The aggregate (across all sources) rate of data arriving. */ + def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum + + /** The aggregate (across all sources) rate at which Spark is processing data. */ + def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { +def safeDoubleToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing else JDouble(value) +} + +("id" -> JString(id.toString)) ~ +("name" -> JString(name)) ~ +("timestamp" -> JInt(timestamp)) ~ +("numInputRows" -> JInt(numInputRows)) ~ +("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ +("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ +("durationMs" -> duratio
[2/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming
[SPARK-18516][SQL] Split state and progress in streaming This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das Author: Michael Armbrust Closes #15954 from marmbrus/queryProgress. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3d08e2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3d08e2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3d08e2f Branch: refs/heads/master Commit: c3d08e2f29baeebe09bf4c059ace4336af9116b5 Parents: 9a02f68 Author: Tathagata Das Authored: Tue Nov 29 17:24:17 2016 -0800 Committer: Michael Armbrust Committed: Tue Nov 29 17:24:17 2016 -0800 -- .../spark/sql/kafka010/KafkaSourceSuite.scala | 7 +- project/MimaExcludes.scala | 11 + python/pyspark/sql/streaming.py | 326 ++- python/pyspark/sql/tests.py | 22 ++ .../execution/streaming/MetricsReporter.scala | 53 +++ .../execution/streaming/ProgressReporter.scala | 234 + .../execution/streaming/StreamExecution.scala | 282 .../sql/execution/streaming/StreamMetrics.scala | 243 -- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../apache/spark/sql/streaming/SinkStatus.scala | 66 .../spark/sql/streaming/SourceStatus.scala | 95 -- .../spark/sql/streaming/StreamingQuery.scala| 33 +- .../sql/streaming/StreamingQueryException.scala | 2 +- .../sql/streaming/StreamingQueryListener.scala | 24 +- .../sql/streaming/StreamingQueryManager.scala | 27 +- .../sql/streaming/StreamingQueryStatus.scala| 151 + .../apache/spark/sql/streaming/progress.scala | 193 +++ .../streaming/StreamMetricsSuite.scala | 213 .../sql/streaming/FileStreamSourceSuite.scala | 10 +- .../apache/spark/sql/streaming/StreamTest.scala | 73 + .../streaming/StreamingQueryListenerSuite.scala | 267 --- .../streaming/StreamingQueryManagerSuite.scala | 2 +- .../streaming/StreamingQueryProgressSuite.scala | 98 ++ .../streaming/StreamingQueryStatusSuite.scala | 123 --- .../sql/streaming/StreamingQuerySuite.scala | 260 --- .../spark/sql/streaming/WatermarkSuite.scala| 16 +- 26 files changed, 1087 insertions(+), 1752 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e1af14f..2d6ccb2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
[2/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming
[SPARK-18516][SQL] Split state and progress in streaming This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das Author: Michael Armbrust Closes #15954 from marmbrus/queryProgress. (cherry picked from commit c3d08e2f29baeebe09bf4c059ace4336af9116b5) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28b57c8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28b57c8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28b57c8a Branch: refs/heads/branch-2.1 Commit: 28b57c8a124fe55501c4ca4b91320851ace5d735 Parents: 045ae29 Author: Tathagata Das Authored: Tue Nov 29 17:24:17 2016 -0800 Committer: Michael Armbrust Committed: Tue Nov 29 17:24:37 2016 -0800 -- .../spark/sql/kafka010/KafkaSourceSuite.scala | 7 +- project/MimaExcludes.scala | 11 + python/pyspark/sql/streaming.py | 326 ++- python/pyspark/sql/tests.py | 22 ++ .../execution/streaming/MetricsReporter.scala | 53 +++ .../execution/streaming/ProgressReporter.scala | 234 + .../execution/streaming/StreamExecution.scala | 282 .../sql/execution/streaming/StreamMetrics.scala | 243 -- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../apache/spark/sql/streaming/SinkStatus.scala | 66 .../spark/sql/streaming/SourceStatus.scala | 95 -- .../spark/sql/streaming/StreamingQuery.scala| 33 +- .../sql/streaming/StreamingQueryException.scala | 2 +- .../sql/streaming/StreamingQueryListener.scala | 24 +- .../sql/streaming/StreamingQueryManager.scala | 27 +- .../sql/streaming/StreamingQueryStatus.scala| 151 + .../apache/spark/sql/streaming/progress.scala | 193 +++ .../streaming/StreamMetricsSuite.scala | 213 .../sql/streaming/FileStreamSourceSuite.scala | 10 +- .../apache/spark/sql/streaming/StreamTest.scala | 73 + .../streaming/StreamingQueryListenerSuite.scala | 267 --- .../streaming/StreamingQueryManagerSuite.scala | 2 +- .../streaming/StreamingQueryProgressSuite.scala | 98 ++ .../streaming/StreamingQueryStatusSuite.scala | 123 --- .../sql/streaming/StreamingQuerySuite.scala | 260 --- .../spark/sql/streaming/WatermarkSuite.scala| 16 +- 26 files changed, 1087 insertions(+), 1752 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e1af14f..2d6ccb2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/e
[GitHub] spark issue #15954: [SPARK-18516][SQL] Split state and progress in streaming
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15954 LGTM, merging to master and 2.1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [SPARK-18516][SQL] Split state and progress in st...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90133572 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -59,13 +62,20 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { /** * Returns the query if there is an active query with the given id, or null. * - * @since 2.0.0 + * @since 2.1.0 */ - def get(id: Long): StreamingQuery = activeQueriesLock.synchronized { + def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized { activeQueries.get(id).orNull } /** + * Returns the query if there is an active query with the given id, or null. + * + * @since 2.1.0 + */ + def get(id: String): StreamingQuery = get(UUID.fromString(id)) --- End diff -- I think thats okay. A globally unique ID is a better identifier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [SPARK-18516][SQL] Split state and progress in st...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90133503 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala --- @@ -64,23 +68,26 @@ trait StreamingQuery { /** * Returns the current status of the query. + * * @since 2.0.2 */ def status: StreamingQueryStatus /** - * Returns current status of all the sources. - * @since 2.0.0 + * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + * The number of progress updates retained for each stream is configured by Spark session + * configuration `spark.sql.streaming.numRecentProgresses`. + * + * @since 2.1.0 */ - @deprecated("use status.sourceStatuses", "2.0.2") - def sourceStatuses: Array[SourceStatus] + def recentProgresses: Array[StreamingQueryProgress] --- End diff -- Last `n` triggers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing
Repository: spark Updated Branches: refs/heads/branch-2.1 d3aaed219 -> e8ca1aea5 [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files. Author: Tyson Condie Closes #15924 from tcondie/SPARK-18498. Signed-off-by: Michael Armbrust (cherry picked from commit f643fe47f4889faf68da3da8d7850ee48df7c22f) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8ca1aea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8ca1aea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8ca1aea Branch: refs/heads/branch-2.1 Commit: e8ca1aea56956755e6335c0b7d2cbaa43e1f1e18 Parents: d3aaed2 Author: Tyson Condie Authored: Tue Nov 29 12:36:41 2016 -0800 Committer: Michael Armbrust Committed: Tue Nov 29 12:38:04 2016 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 100 --- 1 file changed, 66 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8ca1aea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index d95ec7f..1b41352 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - /** - * Write a batch to a temp file then rename it to the batch file. - * - * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a - * valid behavior, we still need to prevent it from destroying the files. - */ - private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { -// Use nextId to create a temp file + def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { var nextId = 0 while (true) { val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") @@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val output = fileManager.create(tempPath) try { writer(metadata, output) + return Some(tempPath) } finally { IOUtils.closeQuietly(output) } -try { - // Try to commit the batch - // It will fail if there is an existing file (someone has committed the batch) - logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") - fileManager.rename(tempPath, batchIdToPath(batchId)) - - // SPARK-17475: HDFSMetadataLog should not leak CRC files - // If the underlying filesystem didn't rename the CRC file, delete it. - val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") - if (fileManager.exists(crcPath)) fileManager.delete(crcPath) - return -} catch { - case e: IOException if isFileAlreadyExistsException(e) => -// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. -// So throw an exception to tell the user this is not a valid behavior. -throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) - case e: FileNotFoundException => -// Sometimes, "create" will succeed when multiple writers are calling it at the same -// time. However, only one writer can call "rename" successfully, others will get -// FileNotFoundException because the first writer has removed it. -throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) -} } catch { case e: IOException if isFileAlreadyExistsException(e) => // Failed to create "tempPath". There are two cases: @@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // metadata path. I
spark git commit: [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing
Repository: spark Updated Branches: refs/heads/master 95f798501 -> f643fe47f [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files. Author: Tyson Condie Closes #15924 from tcondie/SPARK-18498. Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f643fe47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f643fe47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f643fe47 Branch: refs/heads/master Commit: f643fe47f4889faf68da3da8d7850ee48df7c22f Parents: 95f7985 Author: Tyson Condie Authored: Tue Nov 29 12:36:41 2016 -0800 Committer: Michael Armbrust Committed: Tue Nov 29 12:37:36 2016 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 100 --- 1 file changed, 66 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f643fe47/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index d95ec7f..1b41352 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - /** - * Write a batch to a temp file then rename it to the batch file. - * - * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a - * valid behavior, we still need to prevent it from destroying the files. - */ - private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { -// Use nextId to create a temp file + def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { var nextId = 0 while (true) { val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") @@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val output = fileManager.create(tempPath) try { writer(metadata, output) + return Some(tempPath) } finally { IOUtils.closeQuietly(output) } -try { - // Try to commit the batch - // It will fail if there is an existing file (someone has committed the batch) - logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") - fileManager.rename(tempPath, batchIdToPath(batchId)) - - // SPARK-17475: HDFSMetadataLog should not leak CRC files - // If the underlying filesystem didn't rename the CRC file, delete it. - val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") - if (fileManager.exists(crcPath)) fileManager.delete(crcPath) - return -} catch { - case e: IOException if isFileAlreadyExistsException(e) => -// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. -// So throw an exception to tell the user this is not a valid behavior. -throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) - case e: FileNotFoundException => -// Sometimes, "create" will succeed when multiple writers are calling it at the same -// time. However, only one writer can call "rename" successfully, others will get -// FileNotFoundException because the first writer has removed it. -throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) -} } catch { case e: IOException if isFileAlreadyExistsException(e) => // Failed to create "tempPath". There are two cases: @@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // metadata path. In addition, the old Streaming also have this issue, people can create // malicious c
[GitHub] spark pull request #15924: [SPARK-18498] [SQL] Revise HDFSMetadataLog API fo...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15924#discussion_r90090753 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -129,48 +129,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - /** - * Write a batch to a temp file then rename it to the batch file. - * - * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a - * valid behavior, we still need to prevent it from destroying the files. - */ - private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { -// Use nextId to create a temp file + def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { var nextId = 0 -while (true) { +while(true) { --- End diff -- Nit: Space after `while` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90083413 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala --- @@ -38,11 +40,11 @@ trait StreamingQuery { def name: String /** - * Returns the unique id of this query. This id is automatically generated and is unique across - * all queries that have been started in the current process. - * @since 2.0.0 + * Returns the unique id of this query. An id is tied to the checkpoint location and will + * be the same across restarts of a given streaming query. --- End diff -- We should fix the TODO earlier, or remove this promise for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90082045 --- Diff: python/pyspark/sql/streaming.py --- @@ -87,6 +88,24 @@ def awaitTermination(self, timeout=None): else: return self._jsq.awaitTermination() +@property +@since(2.1) +def recentProgresses(self): +"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. +The number of progress updates retained for each stream is configured by Spark session +configuration `spark.sql.streaming.numRecentProgresses`. +""" +return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + +@property +@since(2.1) +def lastProgress(self): +""" +Returns the most recent :class:`StreamingQueryProgress` update of this streaming query. +:return: a map +""" +return json.loads(self._jsq.lastProgress().toString()) --- End diff -- I'd use `json` as above instead of relying on the fact that the `toString` is `json`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90084842 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } } } + +object StreamingQueryManager { + private val _nextId = new AtomicLong(0) + def nextId: Long = _nextId.getAndIncrement() --- End diff -- `private` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90085518 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( +val numRowsTotal: Long, +val numRowsUpdated: Long) { + private[sql] def jsonValue: JValue = { +("numRowsTotal" -> JInt(numRowsTotal)) ~ +("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} + +/** + * :: Experimental :: + * Used to report statistics about progress that has been made in the execution of a + * [[StreamingQuery]]. Each event relates to processing done for a single trigger of the streaming + * query. Events are emitted even when no new data is available to be processed. + * + * @param id A unique id of the query. + * @param name Name of the query. This name is unique across all active queries. + * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param batchId A unique id for the current batch of data being processed. Note that in the + *case of retries after a failure a given batchId my be executed more than once. + *Similarly, when there is no data to be processed, the batchId will not be + *incremented. + * @param durationMs The amount of time taken to perform various operations in milliseconds. + * @param currentWatermark The current event time watermark in milliseconds + * @param stateOperators Information about operators in the query that store state. + * @param sources detailed statistics on data being read from each of the streaming sources. + * @since 2.1.0 + */ +@Experimental +class StreamingQueryProgress private[sql]( + val id: UUID, + val name: String, + val timestamp: Long, + val batchId: Long, // TODO: epoch? --- End diff -- We probably will not do this `TODO`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90085377 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( --- End diff -- We should move `SourceProgress` here or put `StateOperatorProgress` in its own file. We might also consider putting them all in `org.apache.spark.sql.streaming.progress`, but there might not be time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90084970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Statistics about updates made to a stateful operators in a [[StreamingQuery]] in a trigger. --- End diff -- nit: during a trigger? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90083627 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala --- @@ -51,7 +53,7 @@ trait StreamingQuery { def sparkSession: SparkSession /** - * Whether the query is currently active or not + * Returns `true` if this query is actively running. * @since 2.0.0 --- End diff -- Nit: other places have a blank line before `@since` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90086100 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -669,55 +658,48 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } - - class QueryStatusCollector extends StreamingQueryListener { + /** Collects events from the StreamingQueryListener for testing */ + class EventCollector extends StreamingQueryListener { --- End diff -- I don't think this needs to be an inner class of `StreamTest`. This file is pretty long/complicated as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90084420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala --- @@ -81,30 +83,30 @@ object StreamingQueryListener { /** * :: Experimental :: * Event representing the start of a query - * @since 2.0.0 + * @since 2.1.0 */ @Experimental - class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event /** * :: Experimental :: - * Event representing any progress updates in a query - * @since 2.0.0 + * Event representing any progress updates in a query. + * @since 2.1.0 */ @Experimental - class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event /** * :: Experimental :: - * Event representing that termination of a query + * Event representing that termination of a query. * - * @param queryStatus Information about the status of the query. - * @param exception The exception message of the [[StreamingQuery]] if the query was terminated + * @param lastProgress The last progress the query made before it was terminated. + * @param exception The exception message of the query if the query was terminated * with an exception. Otherwise, it will be `None`. - * @since 2.0.0 + * @since 2.1.0 */ @Experimental class QueryTerminatedEvent private[sql]( - val queryStatus: StreamingQueryStatus, - val exception: Option[String]) extends Event +val lastProgress: StreamingQueryProgress, --- End diff -- What is this if no progress is ever made? `null`? I would consider leaving this just the `id`, because otherwise if the query dies before progress is made, now you can't get the `id` at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r90084872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } } } + +object StreamingQueryManager { --- End diff -- `private` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r89035938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Holds statistics about state that is being stored for a given streaming query. + */ +@Experimental +class StateOperator private[sql]( +val numEntries: Long, --- End diff -- Those sound good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r89032237 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala --- @@ -64,23 +66,26 @@ trait StreamingQuery { /** * Returns the current status of the query. + * * @since 2.0.2 */ def status: StreamingQueryStatus /** - * Returns current status of all the sources. - * @since 2.0.0 + * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + * The number of records retained for each stream is configured by + * `spark.sql.streaming.numProgressRecords`. + * + * @since 2.1.0 */ - @deprecated("use status.sourceStatuses", "2.0.2") - def sourceStatuses: Array[SourceStatus] + def recentProgress: Array[StreamingQueryProgress] --- End diff -- Hmmm, yeah maybe. Its not clear to me that `progress` is inherently singular and `progresses` is kind of a mouthful. It is maybe nice for `Array`s to always be plural though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r89032104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceProgress.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.util.control.NonFatal + +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Reports metrics on data being read from a given streaming source. + * + * @param description Description of the source. + * @param startOffset The starting offset for data being read. + * @param endOffset The ending offset for data being read. + * @param numRecords The number of records read from this source. --- End diff -- I think if we update the docs as you suggest above this will be clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r89032039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Holds statistics about state that is being stored for a given streaming query. + */ +@Experimental +class StateOperator private[sql]( +val numEntries: Long, --- End diff -- +1 to docs. I think `numTotal` is less clear. Total of what? It is a count of the number of entries that the state store is holding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15954#discussion_r89031940 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceProgress.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.util.control.NonFatal + +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Reports metrics on data being read from a given streaming source. --- End diff -- Sure, we can copy the docs from the main class: `Each event relates to processing done for a single trigger of the streaming query.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15962: [SPARK-18526][SQL][KAFKA] Allow users to configure max.p...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15962 Do you have any performance comparisons to show that we need to do this? On the driver, I think we want it to be as small as possible, because we don't want to pull any data down (it would be even better if we could just issue a raw ListOffsets request). On the executors, we should know exactly how many offsets we need to pull down, so I'm not sure it make sense to allow users to configure this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15954: [WIP][SPARK-18516][SQL] Split state and progress in stre...
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/15954 /cc @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org