[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778114
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
+
+  def update(newState: S): Unit
+
+  def remove(): Unit
--- End diff --

Scala doc for these, even though its pretty obvious.  In particular, I 
assume its safe to call update() get remove() more than once in the function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779663
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
+
+  override protected def doExecute(): RDD[InternalRow] = {
+
--- End diff --

nit: extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98775275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
--- End diff --

Any updates to the state will be stored and passed to the user given 
function in subsequent batches when executed as a Streaming Query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
--- End diff --

Should this be in `streaming`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
+
+  override protected def doExecute(): RDD[InternalRow] = {
+
+child.execute().mapPartitionsWithStateStore[InternalRow](
--- End diff --

I don't think you need the type here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777503
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
--- End diff --

I think we want to say what state we are talking about.  Something like 
"per-key state from previous invocations of the function in a StreamingQuery"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98780383
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -144,6 +145,12 @@ object ObjectOperator {
 (i: InternalRow) => proj(i).get(0, deserializer.dataType)
   }
 
+  def deserializeRowToObject(
+deserializer: Expression): InternalRow => Any = {
--- End diff --

indent, also does this not fit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
--- End diff --

Yes, the GroupedIterator relies on sorting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779015
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -90,6 +93,14 @@ class IncrementalExecution(
   keys,
   Some(stateId),
   child) :: Nil))
+  case MapGroupsWithStateExec(
+func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr,
--- End diff --

indent inconsistent with above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98775825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
--- End diff --

while maintaining some user-defined state for each key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98780164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -184,7 +189,7 @@ case class StateStoreSaveExec(
 }
 
 // Assumption: Append mode can be done only when watermark has 
been specified
-store.remove(watermarkPredicate.get.eval)
+store.remove(watermarkPredicate.get.eval _)
--- End diff --

Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
+  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+  private var defined: Boolean = optionalValue.isDefined
+  private var updated: Boolean = false  // whether value has been updated 
(but not removed)
+  private var removed: Boolean = false  // whether value has eben removed
+
+  // = Public API =
+  override def exists: Boolean = {
+defined
+  }
+
+  override def get(): S = {
--- End diff --

no `()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+trait StateStoreReader extends StatefulOperator {
--- End diff --

and lowercase if it contains multiple classes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777860
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
--- End diff --

No `()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+trait StateStoreReader extends StatefulOperator {
+  override lazy val metrics = Map(
+"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+}
+
+trait StateStoreWriter extends StatefulOperator {
--- End diff --

Scala doc for both of these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777585
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
--- End diff --

`KeyState`?  State just feels very generic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778773
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
--- End diff --

I would consider using `null` here to avoid extra allocations in the 
critical path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776628
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
--- End diff --

I think it would be useful to come up with an example use case since this 
is pretty complicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
+  func: (K, Iterator[V], State[STATE]) => OUT): Dataset[OUT] = {
+val f = (key: K, it: Iterator[V], s: State[STATE]) => 
Iterator(func(key, it, s))
+flatMapGroupsWithState[STATE, OUT](f)
+  }
+
+  /**
+   * ::Experimental::
+   * (Java-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE, OUT](
--- End diff --

Since this is pretty complicated and we might iterate over time, I wonder 
if we shouldn't just put the full explanation in `State` and link there with a 
very brief version for each function.  Not sure...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776538
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
--- End diff --

I'd maybe put these into bullets as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778649
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
+  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+  private var defined: Boolean = optionalValue.isDefined
+  private var updated: Boolean = false  // whether value has been updated 
(but not removed)
+  private var removed: Boolean = false  // whether value has eben removed
--- End diff --

"been"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776799
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
--- End diff --

I would also stick with `S` and `U` to match other functions in the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777806
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
--- End diff --

A user defined type that can be stored for each key.  Must be encodable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777964
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
+
+  def update(newState: S): Unit
+
+  def remove(): Unit
+
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
--- End diff --

No `()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16745: [SPARK-19406] [SQL] Fix function to_json to respect user...

2017-01-30 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16745
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...

2017-01-24 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16329
  
Sorry for the delay.  This LGTM, but I'm currently away from my Apache SSH 
keys.  Other committers should feel free to merge if you get there before I do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16664: [SPARK-18120 ][SQL] Call QueryExecutionListener callback...

2017-01-23 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16664
  
/cc @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16564: [SPARK-19065][SS]Rewrite Alias in StreamExecution if nec...

2017-01-12 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16564
  
Hmm, I'm not sure that I agree with the solution from #15427.  I do not 
think that it should be valid to have to different expressions that have the 
same expression id.  There are many case where we break the `df("col")` syntax 
by adding new operations, and I don't think it is worth that hack to preserve 
this particular case with drop duplicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16553: [SPARK-9435][SQL] Reuse function in Java UDF to c...

2017-01-11 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16553#discussion_r95655634
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala ---
@@ -488,219 +488,241 @@ class UDFRegistration private[sql] 
(functionRegistry: FunctionRegistry) extends
* @since 1.3.0
*/
   def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
+val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
--- End diff --

There is commented out code above thats used to generate these functions.  
We should update it or delete it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2017-01-05 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16240#discussion_r94888245
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
---
@@ -99,33 +96,96 @@ abstract class SQLImplicits {
 
   // Seqs
 
-  /** @since 1.6.1 */
-  implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder()
+  /**
+   * @since 1.6.1
+   * @deprecated use [[newIntSequenceEncoder]]
+   */
+  def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder()
--- End diff --

Wait, I'm not sure I agree... Do we want to break binary compatibility for 
libraries that might be using this function?  That could have even been 
resolved implicitly, so it would be confusing when it breaks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2017-01-05 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16240
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2017-01-05 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16240
  
For future reference: https://github.com/apache/spark/blob/master/dev/mima 
(script to run mima)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16371: [SPARK-18932][SQL] Support partial aggregation for colle...

2016-12-28 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16371
  
+1 I think we can move forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16322: [SPARK-18908][SS] Creating StreamingQueryException shoul...

2016-12-21 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16322
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public

2016-12-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16360#discussion_r93508333
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -219,7 +221,13 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   if (extraOptions.get("queryName").isEmpty) {
 throw new AnalysisException("queryName must be specified for 
memory sink")
   }
-
+  outputMode match {
+case Append | Complete => // allowed
+case Update =>
+  throw new AnalysisException("Update ouptut mode is not supported 
for memory format")
--- End diff --

"memory sink" for consistency with the above error


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public

2016-12-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16360#discussion_r93507397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalOutputModes.scala
 ---
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst
--- End diff --

maybe in catalyst.streaming or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16360: [SPARK-18234][SS] Made update mode public

2016-12-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16360#discussion_r93509028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -219,7 +221,13 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   if (extraOptions.get("queryName").isEmpty) {
 throw new AnalysisException("queryName must be specified for 
memory sink")
   }
-
+  outputMode match {
+case Append | Complete => // allowed
+case Update =>
+  throw new AnalysisException("Update ouptut mode is not supported 
for memory format")
--- End diff --

also "output".  Thinking a little more, I wonder if it would be better to 
say what is supported?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...

2016-12-20 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16304#discussion_r93355210
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -387,7 +387,7 @@ class StreamExecution(
 lastExecution.executedPlan.collect {
   case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delay.milliseconds
+math.max(0, e.eventTimeStats.value.max - e.delayMs)
--- End diff --

Im not totally sure, but it does seem likely that we don't handle negative 
dates (and that is probably okay).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16322: [SPARK-18908][SS] Creating StreamingQueryExceptio...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16322#discussion_r93121938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -206,6 +201,36 @@ class StreamExecution(
 startLatch.await()  // Wait until thread started and QueryStart event 
has been posted
   }
 
+  private def generateLogicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+val internalLogicalPlan = analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+sources = internalLogicalPlan.collect { case s: 
StreamingExecutionRelation => s.source }
+uniqueSources = sources.distinct
+internalLogicalPlan
+  }
+
+  override def logicalPlan: LogicalPlan = {
+if (_logicalPlan == null) {
+  localPlanLock.synchronized {
+if (_logicalPlan == null) {
+  _logicalPlan = generateLogicalPlan
+}
+  }
+}
+_logicalPlan
+  }
--- End diff --

or really what i was suggesting was to have a separate try/catch early in 
the stream thread that forces initialization of anything that is lazy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...

2016-12-19 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16329
  
This is great!  Thanks for taking the time to write up such complete 
examples.  I think this was a big gap in the existing docs.  One other ask.  
The screen-shot is great, but I'd like to see which parts actually make it into 
the code snipits in the doc.  Ideally you could post a link to compiled doc.  
If thats hard I can also try to build locally though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93119785
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:typed_custom_aggregation$
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.SparkSession
+// $example off:typed_custom_aggregation$
+
+object UserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  case class Salary(person: String, salary: Long)
+  case class Average(sum: Long, count: Long)
+
+  object MyAverage extends Aggregator[Salary, Average, Double] {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+def zero: Average = Average(0L, 0L)
+// Combine two values to produce a new value. For performance, the 
function may modify `b` and
--- End diff --

Same comment here with object reuse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93121147
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:typed_custom_aggregation$
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.SparkSession
+// $example off:typed_custom_aggregation$
+
+object UserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  case class Salary(person: String, salary: Long)
+  case class Average(sum: Long, count: Long)
+
+  object MyAverage extends Aggregator[Salary, Average, Double] {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+def zero: Average = Average(0L, 0L)
+// Combine two values to produce a new value. For performance, the 
function may modify `b` and
+// return it instead of constructing a new object for b
+def reduce(b: Average, a: Salary): Average = Average(b.sum + a.salary, 
b.count + 1)
+// Merge two intermediate values
+def merge(b1: Average, b2: Average): Average = Average(b1.sum + 
b2.sum, b1.count + b2.count)
+// Transform the output of the reduction
+def finish(reduction: Average): Double = reduction.sum.toDouble / 
reduction.count
+// Specifies the Encoder for the intermediate value type
+def bufferEncoder: Encoder[Average] = Encoders.product
+// Specifies the Encoder for the final output value type
+def outputEncoder: Encoder[Double] = Encoders.scalaDouble
+  }
+  // $example off:typed_custom_aggregation$
+
+  def main(args: Array[String]): Unit = {
+val spark = SparkSession
+  .builder()
+  .appName("Spark SQL user-defined Datasets aggregation example")
+  .getOrCreate()
+
+import spark.implicits._
+
+// $example on:typed_custom_aggregation$
+val ds = 
spark.read.json("examples/src/main/resources/salaries.json").as[Salary]
+ds.show()
+// +---+--+
+// | person|salary|
+// +---+--+
+// |Michael|  3000|
+// |   Andy|  4500|
+// | Justin|  3500|
+// |  Berta|  4000|
+// +---+--+
+
+val averageSalary = MyAverage.toColumn.name("average_salary")
--- End diff --

Maybe comment what `name` is doing here.  I actually had to look it up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93118905
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:typed_custom_aggregation$
+import java.io.Serializable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.expressions.Aggregator;
+// $example off:typed_custom_aggregation$
+
+public class JavaUserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  public static class Salary implements Serializable {
+private String person;
+private long salary;
+
+// Constructors, getters, setters...
+// $example off:typed_custom_aggregation$
+public String getPerson() {
+  return person;
+}
+
+public void setPerson(String person) {
+  this.person = person;
+}
+
+public long getSalary() {
+  return salary;
+}
+
+public void setSalary(long salary) {
+  this.salary = salary;
+}
+// $example on:typed_custom_aggregation$
+  }
+
+  public static class Average implements Serializable  {
+private long sum;
+private long count;
+
+// Constructors, getters, setters...
+// $example off:typed_custom_aggregation$
+public Average() {
+}
+
+public Average(long sum, long count) {
+  this.sum = sum;
+  this.count = count;
+}
+
+public long getSum() {
+  return sum;
+}
+
+public void setSum(long sum) {
+  this.sum = sum;
+}
+
+public long getCount() {
+  return count;
+}
+
+public void setCount(long count) {
+  this.count = count;
+}
+// $example on:typed_custom_aggregation$
+  }
+
+  public static class MyAverage extends Aggregator {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+public Average zero() {
+  return new Average(0L, 0L);
+}
+// Combine two values to produce a new value. For performance, the 
function may modify `b` and
--- End diff --

Its a little confusing to have the comment here for this optimization, but 
then not implement it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93118975
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:typed_custom_aggregation$
+import java.io.Serializable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.expressions.Aggregator;
+// $example off:typed_custom_aggregation$
+
+public class JavaUserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  public static class Salary implements Serializable {
--- End diff --

I might be a little clearer if this was a `Person` with a `name` and 
`salary`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-19 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16240
  
/cc @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16322: [SPARK-18908][SS] Creating StreamingQueryExceptio...

2016-12-19 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16322#discussion_r93113059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -206,6 +201,36 @@ class StreamExecution(
 startLatch.await()  // Wait until thread started and QueryStart event 
has been posted
   }
 
+  private def generateLogicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+val internalLogicalPlan = analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+sources = internalLogicalPlan.collect { case s: 
StreamingExecutionRelation => s.source }
+uniqueSources = sources.distinct
+internalLogicalPlan
+  }
+
+  override def logicalPlan: LogicalPlan = {
+if (_logicalPlan == null) {
+  localPlanLock.synchronized {
+if (_logicalPlan == null) {
+  _logicalPlan = generateLogicalPlan
+}
+  }
+}
+_logicalPlan
+  }
--- End diff --

This is getting pretty complicated...  Do we really need to include all of 
this information in `StreamingQueryException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16304: [SPARK-18894][SS] Fix event time watermark delay thresho...

2016-12-16 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16304
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...

2016-12-16 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16304#discussion_r92904102
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -124,6 +137,29 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
+  test("delay in years handled correctly") {
+val input = MemoryStream[Long]
+val aggWithWatermark = input.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "1 month")
--- End diff --

Just wanted to make sure I wasn't missing something :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16304: [SPARK-18894][SS] Fix event time watermark delay ...

2016-12-16 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16304#discussion_r92902755
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -124,6 +137,29 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
+  test("delay in years handled correctly") {
+val input = MemoryStream[Long]
+val aggWithWatermark = input.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "1 month")
--- End diff --

This seems to be testing months, but the title says years?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16304: [SPARK-18894][SS] Disallow went time watermark de...

2016-12-15 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16304#discussion_r92753969
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -572,6 +572,10 @@ class Dataset[T] private[sql](
 val parsedDelay =
   Option(CalendarInterval.fromString("interval " + delayThreshold))
 .getOrElse(throw new AnalysisException(s"Unable to parse time 
delay '$delayThreshold'"))
+// Threshold specified in months/years is non-deterministic
--- End diff --

what does "safe" mean in this context?  users must not rely on watermarks 
for correctness as they can be arbitrarily delayed based on batch boundaries.  
I think this error actually confuses the point as its is enforcing precision 
when this API cannot provide that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16304: [SPARK-18894][SS] Disallow went time watermark de...

2016-12-15 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16304#discussion_r92753590
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -572,6 +572,10 @@ class Dataset[T] private[sql](
 val parsedDelay =
   Option(CalendarInterval.fromString("interval " + delayThreshold))
 .getOrElse(throw new AnalysisException(s"Unable to parse time 
delay '$delayThreshold'"))
+// Threshold specified in months/years is non-deterministic
--- End diff --

I think waiting 1 month for late data is a reasonable use case. Based on 
the definition of the watermark, its actually okay for us to over estimate this 
delay too.  Why not take take the max (31 days, leap year)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16289: [SPARK-18870] Disallowed Distinct Aggregations on Stream...

2016-12-15 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16289
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16258: [SPARK-18834][SS] Expose event time stats through Stream...

2016-12-13 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16258
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...

2016-12-12 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16258#discussion_r92077840
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -33,27 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 /**
  * :: Experimental ::
- * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
--- End diff --

Thats actually the opposite of how most code in SQL is laid out, so I think 
it would be better to avoid this change.  The logic here is declarations that 
are use later should come first (references before declaration make it harder 
to read), and stuff at the end of the file is kind of hidden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...

2016-12-12 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16258#discussion_r92073271
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 ---
@@ -38,13 +38,18 @@ class StreamingQueryStatusAndProgressSuite extends 
SparkFunSuite {
 |  "id" : "${testProgress1.id.toString}",
 |  "runId" : "${testProgress1.runId.toString}",
 |  "name" : "myName",
-|  "timestamp" : "2016-12-05T20:54:20.827Z",
+|  "triggerTimestamp" : "2016-12-05T20:54:20.827Z",
 |  "numInputRows" : 678,
 |  "inputRowsPerSecond" : 10.0,
 |  "durationMs" : {
 |"total" : 0
 |  },
-|  "currentWatermark" : 3,
+|  "queryTimestamps" : {
+|"eventTime.avg" : "2016-12-05T20:54:20.827Z",
--- End diff --

It is available, it is stored in a human readable format in the offset log 
(BTW, is that a long or a timestamp?).  I think in the log run we'll want to 
open up another API that gives you access to this log, but I think that can 
come later.  For now, its still pretty easy to find.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16258: [SPARK-18834][SS] Expose event time and processin...

2016-12-12 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16258#discussion_r92071196
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -360,6 +360,24 @@ class StreamExecution(
 if (hasNewData) {
   // Current batch timestamp in milliseconds
   offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+  // Update the eventTime watermark if we find one in the plan.
--- End diff --

+1, this makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16238: [SPARK-18811] StreamSource resolution should happ...

2016-12-09 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16238#discussion_r91803533
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, 
StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, 
StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking 
`createSource` call. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
--- End diff --

No, you can give fully qualified classnames as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16238: [SPARK-18811] StreamSource resolution should happen in s...

2016-12-09 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16238
  
This LGTM, I was just talking with @tdas about how I think that all of this 
initialization stuff should be lazy and happen on the stream execution thread.  
I think this can simplify what they are trying to fix in #16220 as well.  /cc 
@zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16238: [SPARK-18811] StreamSource resolution should happ...

2016-12-09 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16238#discussion_r91803089
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, 
StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, 
StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking 
`createSource` call. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
--- End diff --

Nit, I'd consider calling this `BlockingSource` or something more 
descriptive of what makes it special.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16182: [SPARK-18754][SS] Rename recentProgresses to recentProgr...

2016-12-06 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16182
  
/cc @tdas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16182: [SPARK-18754][SS] Rename recentProgresses to rece...

2016-12-06 Thread marmbrus
GitHub user marmbrus opened a pull request:

https://github.com/apache/spark/pull/16182

[SPARK-18754][SS] Rename recentProgresses to recentProgress

Based on an informal survey, users find this option easier to understand / 
remember.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/marmbrus/spark renameRecentProgress

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16182


commit b3813ecd989a00db52de2486da45b13bf479ff04
Author: Michael Armbrust 
Date:   2016-12-07T01:19:16Z

[SPARK-18754][SQL] Rename recentProgresses to recentProgress

commit 7c125680ca66785742daee29fbd2977048a2503a
Author: Michael Armbrust 
Date:   2016-12-07T01:23:19Z

update config option too




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16178: [SPARK-18751][Core]Fix deadlock when SparkContext...

2016-12-06 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16178#discussion_r91190778
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1760,25 +1760,24 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def listJars(): Seq[String] = addedJars.keySet.toSeq
 
   /**
-   * Shut down the SparkContext.
+   * When stopping SparkContext inside Spark components, it's easy to 
cause dead-lock since Spark
+   * may wait for some internal threads to finish. It's better to use this 
method to stop
+   * SparkContext instead.
*/
-  def stop(): Unit = {
-if (env.rpcEnv.isInRPCThread) {
-  // `stop` will block until all RPC threads exit, so we cannot call 
stop inside a RPC thread.
-  // We should launch a new thread to call `stop` to avoid dead-lock.
-  new Thread("stop-spark-context") {
-setDaemon(true)
+  private[spark] def stopInNewThread(): Unit = {
+new Thread("stop-spark-context") {
+  setDaemon(true)
 
-override def run(): Unit = {
-  _stop()
-}
-  }.start()
-} else {
-  _stop()
-}
+  override def run(): Unit = {
+SparkContext.this.stop()
--- End diff --

Will this ever throw an exception?  Should we register an 
`UncaughtExceptionHandler` or try catch with logging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16138: [WIP][SPARK-16609] Add to_date/to_timestamp with format ...

2016-12-05 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16138
  
This will be very convenient!  Looking forward to the whole patch.  For SQL 
I think you should look at 
[`RuntimeReplaceable`](https://github.com/apache/spark/blob/fd90541c35af2bccf0155467bec8cea7c8865046/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L238).
  Also, we might consider having a default version that doen't take a pattern 
(and parses something standard such as `2016-12-01T02:03:01Z`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...

2016-12-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16113#discussion_r90741307
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -605,34 +629,64 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
+
 /**
- * Contains metadata associated with a stream execution. This information 
is
- * persisted to the offset log via the OffsetSeq metadata field. Current
- * information contained in this object includes:
+ * Contains metadata associated with a [[StreamingQuery]]. This 
information is written
+ * in the checkpoint location the first time a query is started and 
recovered every time the query
+ * is restarted.
  *
- * @param batchWatermarkMs: The current eventTime watermark, used to
- * bound the lateness of data that will processed. Time unit: milliseconds
- * @param batchTimestampMs: The current batch processing timestamp.
- * Time unit: milliseconds
+ * @param id  unique id of the [[StreamingQuery]] that needs to be 
persisted across restarts
  */
-case class StreamExecutionMetadata(
-var batchWatermarkMs: Long = 0,
-var batchTimestampMs: Long = 0) {
-  private implicit val formats = StreamExecutionMetadata.formats
-
-  /**
-   * JSON string representation of this object.
-   */
-  def json: String = Serialization.write(this)
+case class StreamMetadata(id: String) {
--- End diff --

Can we move these to their own file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...

2016-12-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16113#discussion_r90741239
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---
@@ -32,21 +32,33 @@ import org.apache.spark.sql.SparkSession
 trait StreamingQuery {
 
   /**
-   * Returns the name of the query. This name is unique across all active 
queries. This can be
-   * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
-   * `dataframe.writeStream.queryName("query").start()`.
+   * Returns the user-specified name of the query, or null if not 
specified.
+   * This name can be specified in the 
`org.apache.spark.sql.streaming.DataStreamWriter`
+   * as `dataframe.writeStream.queryName("query").start()`.
+   * This name, if set, must be unique across all active queries.
*
* @since 2.0.0
*/
   def name: String
 
   /**
-   * Returns the unique id of this query.
+   * Returns the unique id of this query that persists across restarts 
from checkpoint data.
+   * That is, this id is generated when a query is started for the first 
time, and
+   * will be the same every time it is restarted from checkpoint data.
+   * There can only be one query with the same id active in a Spark 
cluster.
--- End diff --

This makes it sound like its okay to have more than one running as long as 
they aren't on the same spark cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16113: [SPARK-18657][SPARK-18668] Make StreamingQuery.id...

2016-12-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16113#discussion_r90741067
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -54,6 +61,26 @@ object OffsetSeq {
* `nulls` in the sequence are converted to `None`s.
*/
   def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
-OffsetSeq(offsets.map(Option(_)), metadata)
+OffsetSeq(offsets.map(Option(_)), 
metadata.map(OffsetSeqMetadata.apply))
   }
 }
+
+
+/**
+ * Contains metadata associated with a [[OffsetSeq]]. This information is
+ * persisted to the offset log in the checkpoint location via the 
[[OffsetSeq]] metadata field.
+ *
+ * @param batchWatermarkMs: The current eventTime watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * @param batchTimestampMs: The current batch processing timestamp.
+ * Time unit: milliseconds
+ */
+case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
--- End diff --

Can we put this in its own file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...

2016-12-01 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15918
  
I don't think you can limit the implicit.  What type would pick up case 
classes, but not case classes that contain invalid things? I think you would 
need a macros for this kind of introspection.  (I'd be happy to be proven wrong 
with a PR.)

I'd recommend you only import the implicits you need rather than using the 
wildcard.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16094: [SPARK-18541][Python]Add metadata parameter to pyspark.s...

2016-12-01 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16094
  
No worries, thanks for working on this!  It's great to ensure our Python 
APIs aren't lagging behind the Scala ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13147: [SPARK-6320][SQL] Move planLater method into Gene...

2016-12-01 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13147#discussion_r90560927
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 ---
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * empty list should be returned.
  */
 abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] 
extends Logging {
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan): PhysicalPlan
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
 }
 
--- End diff --

Fine to include it there!  I guess I need to review that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16094: [SPARK-18541][Python]Add metadata parameter to pyspark.s...

2016-12-01 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/16094
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...

2016-11-30 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15918
  
We should probably add a flag (maybe even off by default).  The error 
message can tell you to turn on the flag if you are okay with the fallback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15918: [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported ...

2016-11-30 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15918
  
I agree that the only change in behavior is that things that used to throw 
an error will now not throw an error.  If done right (I haven't looked deeply 
at the PR itself yet), no case that is currently working should change.

It is maybe slightly odd to mix serialization types, but thats kind of 
already happening today if you use the `kryo` serializer.  You are taking kryo 
encoded data and putting it as a binary value into a tungsten row.  The change 
here makes it possible to do the same in cases where the incompatible object is 
nested within a compatible object.  Currently you are forced into all or 
nothing (i.e. even if only a single field is incompatible you must treat the 
whole object as an opaque binary blob).

The one possible concern compatibility concern I can see is, if in the 
future we add support for an previously unsupported type, the schema will 
change from `BinaryType` to something else.  However, given there are very few 
operations you can do on Binary, and this format is not persisted or guaranteed 
to be compatible across Spark versions, this actually seems okay.

Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13147: [SPARK-6320][SQL] Move planLater method into Gene...

2016-11-30 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13147#discussion_r90317826
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 ---
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * empty list should be returned.
  */
 abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] 
extends Logging {
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan): PhysicalPlan
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
 }
 
--- End diff --

I just noticed, there is a TODO below in the comment that I think we can 
remove now :)

>   * TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER...  PLAN SPACE 
EXPLORATION WILL BE IMPLEMENTED LATER.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[1/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming

2016-11-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 9a02f6821 -> c3d08e2f2


http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 000..7129fa4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+val numRowsTotal: Long,
+val numRowsUpdated: Long) {
+  private[sql] def jsonValue: JValue = {
+("numRowsTotal" -> JInt(numRowsTotal)) ~
+("numRowsUpdated" -> JInt(numRowsUpdated))
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during
+ * a trigger. Each event relates to processing done for a single trigger of 
the streaming
+ * query. Events are emitted even when no new data is available to be 
processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active 
queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being processed.  
Note that in the
+ *case of retries after a failure a given batchId my be 
executed more than once.
+ *Similarly, when there is no data to be processed, the 
batchId will not be
+ *incremented.
+ * @param durationMs The amount of time taken to perform various operations in 
milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that store 
state.
+ * @param sources detailed statistics on data being read from each of the 
streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+  val id: UUID,
+  val name: String,
+  val timestamp: Long,
+  val batchId: Long,
+  val durationMs: ju.Map[String, java.lang.Long],
+  val currentWatermark: Long,
+  val stateOperators: Array[StateOperatorProgress],
+  val sources: Array[SourceProgress],
+  val sink: SinkProgress) {
+
+  /** The aggregate (across all sources) number of records processed in a 
trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum
+
+  /** The aggregate (across all sources) rate of data arriving. */
+  def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+  /** The aggregate (across all sources) rate at which Spark is processing 
data. */
+  def processedRowsPerSecond: Double = 
sources.map(_.processedRowsPerSecond).sum
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+def safeDoubleToJValue(value: Double): JValue = {
+  if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+}
+
+("id" -> JString(id.toString)) ~
+("name" -> JString(name)) ~
+("timestamp" -> JInt(timestamp)) ~
+("numInputRows" -> JInt(numInputRows)) ~
+("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
+("durationMs" -> durationMs

[1/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming

2016-11-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 045ae299c -> 28b57c8a1


http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 000..7129fa4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+val numRowsTotal: Long,
+val numRowsUpdated: Long) {
+  private[sql] def jsonValue: JValue = {
+("numRowsTotal" -> JInt(numRowsTotal)) ~
+("numRowsUpdated" -> JInt(numRowsUpdated))
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during
+ * a trigger. Each event relates to processing done for a single trigger of 
the streaming
+ * query. Events are emitted even when no new data is available to be 
processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active 
queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being processed.  
Note that in the
+ *case of retries after a failure a given batchId my be 
executed more than once.
+ *Similarly, when there is no data to be processed, the 
batchId will not be
+ *incremented.
+ * @param durationMs The amount of time taken to perform various operations in 
milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that store 
state.
+ * @param sources detailed statistics on data being read from each of the 
streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+  val id: UUID,
+  val name: String,
+  val timestamp: Long,
+  val batchId: Long,
+  val durationMs: ju.Map[String, java.lang.Long],
+  val currentWatermark: Long,
+  val stateOperators: Array[StateOperatorProgress],
+  val sources: Array[SourceProgress],
+  val sink: SinkProgress) {
+
+  /** The aggregate (across all sources) number of records processed in a 
trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum
+
+  /** The aggregate (across all sources) rate of data arriving. */
+  def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+  /** The aggregate (across all sources) rate at which Spark is processing 
data. */
+  def processedRowsPerSecond: Double = 
sources.map(_.processedRowsPerSecond).sum
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+def safeDoubleToJValue(value: Double): JValue = {
+  if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+}
+
+("id" -> JString(id.toString)) ~
+("name" -> JString(name)) ~
+("timestamp" -> JInt(timestamp)) ~
+("numInputRows" -> JInt(numInputRows)) ~
+("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
+("durationMs" -> duratio

[2/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming

2016-11-29 Thread marmbrus
[SPARK-18516][SQL] Split state and progress in streaming

This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, 
including what phase of processing is currently happening and if data is 
available.
 - `recentProgress` - an array of statistics about the most recent microbatches 
that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
"triggerExecution" : 276,
"queryPlanning" : 3,
"getBatch" : 5,
"getOffset" : 3,
"addBatch" : 234,
"walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[topic-14]]",
"startOffset" : {
  "topic-14" : {
"2" : 0,
"4" : 1,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "topic-14" : {
"2" : 1,
"4" : 2,
"1" : 0,
"3" : 0,
"0" : 1
  }
},
"numRecords" : 3,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across 
restarts, we change the `id` field from an integer that is unique with in the 
JVM to a `UUID` that is globally unique.

Author: Tathagata Das 
Author: Michael Armbrust 

Closes #15954 from marmbrus/queryProgress.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3d08e2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3d08e2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3d08e2f

Branch: refs/heads/master
Commit: c3d08e2f29baeebe09bf4c059ace4336af9116b5
Parents: 9a02f68
Author: Tathagata Das 
Authored: Tue Nov 29 17:24:17 2016 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 29 17:24:17 2016 -0800

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   7 +-
 project/MimaExcludes.scala  |  11 +
 python/pyspark/sql/streaming.py | 326 ++-
 python/pyspark/sql/tests.py |  22 ++
 .../execution/streaming/MetricsReporter.scala   |  53 +++
 .../execution/streaming/ProgressReporter.scala  | 234 +
 .../execution/streaming/StreamExecution.scala   | 282 
 .../sql/execution/streaming/StreamMetrics.scala | 243 --
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../apache/spark/sql/streaming/SinkStatus.scala |  66 
 .../spark/sql/streaming/SourceStatus.scala  |  95 --
 .../spark/sql/streaming/StreamingQuery.scala|  33 +-
 .../sql/streaming/StreamingQueryException.scala |   2 +-
 .../sql/streaming/StreamingQueryListener.scala  |  24 +-
 .../sql/streaming/StreamingQueryManager.scala   |  27 +-
 .../sql/streaming/StreamingQueryStatus.scala| 151 +
 .../apache/spark/sql/streaming/progress.scala   | 193 +++
 .../streaming/StreamMetricsSuite.scala  | 213 
 .../sql/streaming/FileStreamSourceSuite.scala   |  10 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  73 +
 .../streaming/StreamingQueryListenerSuite.scala | 267 ---
 .../streaming/StreamingQueryManagerSuite.scala  |   2 +-
 .../streaming/StreamingQueryProgressSuite.scala |  98 ++
 .../streaming/StreamingQueryStatusSuite.scala   | 123 ---
 .../sql/streaming/StreamingQuerySuite.scala | 260 ---
 .../spark/sql/streaming/WatermarkSuite.scala|  16 +-
 26 files changed, 1087 insertions(+), 1752 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f..2d6ccb2 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

[2/2] spark git commit: [SPARK-18516][SQL] Split state and progress in streaming

2016-11-29 Thread marmbrus
[SPARK-18516][SQL] Split state and progress in streaming

This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, 
including what phase of processing is currently happening and if data is 
available.
 - `recentProgress` - an array of statistics about the most recent microbatches 
that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
"triggerExecution" : 276,
"queryPlanning" : 3,
"getBatch" : 5,
"getOffset" : 3,
"addBatch" : 234,
"walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[topic-14]]",
"startOffset" : {
  "topic-14" : {
"2" : 0,
"4" : 1,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "topic-14" : {
"2" : 1,
"4" : 2,
"1" : 0,
"3" : 0,
"0" : 1
  }
},
"numRecords" : 3,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across 
restarts, we change the `id` field from an integer that is unique with in the 
JVM to a `UUID` that is globally unique.

Author: Tathagata Das 
Author: Michael Armbrust 

Closes #15954 from marmbrus/queryProgress.

(cherry picked from commit c3d08e2f29baeebe09bf4c059ace4336af9116b5)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28b57c8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28b57c8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28b57c8a

Branch: refs/heads/branch-2.1
Commit: 28b57c8a124fe55501c4ca4b91320851ace5d735
Parents: 045ae29
Author: Tathagata Das 
Authored: Tue Nov 29 17:24:17 2016 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 29 17:24:37 2016 -0800

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   7 +-
 project/MimaExcludes.scala  |  11 +
 python/pyspark/sql/streaming.py | 326 ++-
 python/pyspark/sql/tests.py |  22 ++
 .../execution/streaming/MetricsReporter.scala   |  53 +++
 .../execution/streaming/ProgressReporter.scala  | 234 +
 .../execution/streaming/StreamExecution.scala   | 282 
 .../sql/execution/streaming/StreamMetrics.scala | 243 --
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../apache/spark/sql/streaming/SinkStatus.scala |  66 
 .../spark/sql/streaming/SourceStatus.scala  |  95 --
 .../spark/sql/streaming/StreamingQuery.scala|  33 +-
 .../sql/streaming/StreamingQueryException.scala |   2 +-
 .../sql/streaming/StreamingQueryListener.scala  |  24 +-
 .../sql/streaming/StreamingQueryManager.scala   |  27 +-
 .../sql/streaming/StreamingQueryStatus.scala| 151 +
 .../apache/spark/sql/streaming/progress.scala   | 193 +++
 .../streaming/StreamMetricsSuite.scala  | 213 
 .../sql/streaming/FileStreamSourceSuite.scala   |  10 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  73 +
 .../streaming/StreamingQueryListenerSuite.scala | 267 ---
 .../streaming/StreamingQueryManagerSuite.scala  |   2 +-
 .../streaming/StreamingQueryProgressSuite.scala |  98 ++
 .../streaming/StreamingQueryStatusSuite.scala   | 123 ---
 .../sql/streaming/StreamingQuerySuite.scala | 260 ---
 .../spark/sql/streaming/WatermarkSuite.scala|  16 +-
 26 files changed, 1087 insertions(+), 1752 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f..2d6ccb2 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/e

[GitHub] spark issue #15954: [SPARK-18516][SQL] Split state and progress in streaming

2016-11-29 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15954
  
LGTM, merging to master and 2.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [SPARK-18516][SQL] Split state and progress in st...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90133572
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -59,13 +62,20 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
   /**
* Returns the query if there is an active query with the given id, or 
null.
*
-   * @since 2.0.0
+   * @since 2.1.0
*/
-  def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
+  def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
 activeQueries.get(id).orNull
   }
 
   /**
+   * Returns the query if there is an active query with the given id, or 
null.
+   *
+   * @since 2.1.0
+   */
+  def get(id: String): StreamingQuery = get(UUID.fromString(id))
--- End diff --

I think thats okay.  A globally unique ID is a better identifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [SPARK-18516][SQL] Split state and progress in st...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90133503
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---
@@ -64,23 +68,26 @@ trait StreamingQuery {
 
   /**
* Returns the current status of the query.
+   *
* @since 2.0.2
*/
   def status: StreamingQueryStatus
 
   /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
+   * Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
+   * The number of progress updates retained for each stream is configured 
by Spark session
+   * configuration `spark.sql.streaming.numRecentProgresses`.
+   *
+   * @since 2.1.0
*/
-  @deprecated("use status.sourceStatuses", "2.0.2")
-  def sourceStatuses: Array[SourceStatus]
+  def recentProgresses: Array[StreamingQueryProgress]
--- End diff --

Last `n` triggers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

2016-11-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d3aaed219 -> e8ca1aea5


[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

Revise HDFSMetadataLog API such that metadata object serialization and final 
batch file write are separated. This will allow serialization checks without 
worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality 
i.e., creation of batch files.

Author: Tyson Condie 

Closes #15924 from tcondie/SPARK-18498.

Signed-off-by: Michael Armbrust 
(cherry picked from commit f643fe47f4889faf68da3da8d7850ee48df7c22f)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8ca1aea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8ca1aea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8ca1aea

Branch: refs/heads/branch-2.1
Commit: e8ca1aea56956755e6335c0b7d2cbaa43e1f1e18
Parents: d3aaed2
Author: Tyson Condie 
Authored: Tue Nov 29 12:36:41 2016 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 29 12:38:04 2016 -0800

--
 .../execution/streaming/HDFSMetadataLog.scala   | 100 ---
 1 file changed, 66 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8ca1aea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f..1b41352 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) 
=> Unit): Unit = {
-// Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = 
serialize): Option[Path] = {
 var nextId = 0
 while (true) {
   val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 val output = fileManager.create(tempPath)
 try {
   writer(metadata, output)
+  return Some(tempPath)
 } finally {
   IOUtils.closeQuietly(output)
 }
-try {
-  // Try to commit the batch
-  // It will fail if there is an existing file (someone has committed 
the batch)
-  logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-  fileManager.rename(tempPath, batchIdToPath(batchId))
-
-  // SPARK-17475: HDFSMetadataLog should not leak CRC files
-  // If the underlying filesystem didn't rename the CRC file, delete 
it.
-  val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
-  if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
-  return
-} catch {
-  case e: IOException if isFileAlreadyExistsException(e) =>
-// If "rename" fails, it means some other "HDFSMetadataLog" has 
committed the batch.
-// So throw an exception to tell the user this is not a valid 
behavior.
-throw new ConcurrentModificationException(
-  s"Multiple HDFSMetadataLog are using $path", e)
-  case e: FileNotFoundException =>
-// Sometimes, "create" will succeed when multiple writers are 
calling it at the same
-// time. However, only one writer can call "rename" successfully, 
others will get
-// FileNotFoundException because the first writer has removed it.
-throw new ConcurrentModificationException(
-  s"Multiple HDFSMetadataLog are using $path", e)
-}
   } catch {
 case e: IOException if isFileAlreadyExistsException(e) =>
   // Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   // metadata path. I

spark git commit: [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

2016-11-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 95f798501 -> f643fe47f


[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

Revise HDFSMetadataLog API such that metadata object serialization and final 
batch file write are separated. This will allow serialization checks without 
worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality 
i.e., creation of batch files.

Author: Tyson Condie 

Closes #15924 from tcondie/SPARK-18498.

Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f643fe47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f643fe47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f643fe47

Branch: refs/heads/master
Commit: f643fe47f4889faf68da3da8d7850ee48df7c22f
Parents: 95f7985
Author: Tyson Condie 
Authored: Tue Nov 29 12:36:41 2016 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 29 12:37:36 2016 -0800

--
 .../execution/streaming/HDFSMetadataLog.scala   | 100 ---
 1 file changed, 66 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f643fe47/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f..1b41352 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) 
=> Unit): Unit = {
-// Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = 
serialize): Option[Path] = {
 var nextId = 0
 while (true) {
   val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 val output = fileManager.create(tempPath)
 try {
   writer(metadata, output)
+  return Some(tempPath)
 } finally {
   IOUtils.closeQuietly(output)
 }
-try {
-  // Try to commit the batch
-  // It will fail if there is an existing file (someone has committed 
the batch)
-  logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-  fileManager.rename(tempPath, batchIdToPath(batchId))
-
-  // SPARK-17475: HDFSMetadataLog should not leak CRC files
-  // If the underlying filesystem didn't rename the CRC file, delete 
it.
-  val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
-  if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
-  return
-} catch {
-  case e: IOException if isFileAlreadyExistsException(e) =>
-// If "rename" fails, it means some other "HDFSMetadataLog" has 
committed the batch.
-// So throw an exception to tell the user this is not a valid 
behavior.
-throw new ConcurrentModificationException(
-  s"Multiple HDFSMetadataLog are using $path", e)
-  case e: FileNotFoundException =>
-// Sometimes, "create" will succeed when multiple writers are 
calling it at the same
-// time. However, only one writer can call "rename" successfully, 
others will get
-// FileNotFoundException because the first writer has removed it.
-throw new ConcurrentModificationException(
-  s"Multiple HDFSMetadataLog are using $path", e)
-}
   } catch {
 case e: IOException if isFileAlreadyExistsException(e) =>
   // Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   // metadata path. In addition, the old Streaming also have this 
issue, people can create
   // malicious c

[GitHub] spark pull request #15924: [SPARK-18498] [SQL] Revise HDFSMetadataLog API fo...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15924#discussion_r90090753
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -129,48 +129,18 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata 
path. Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, 
OutputStream) => Unit): Unit = {
-// Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = 
serialize): Option[Path] = {
 var nextId = 0
-while (true) {
+while(true) {
--- End diff --

Nit: Space after `while`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90083413
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---
@@ -38,11 +40,11 @@ trait StreamingQuery {
   def name: String
 
   /**
-   * Returns the unique id of this query. This id is automatically 
generated and is unique across
-   * all queries that have been started in the current process.
-   * @since 2.0.0
+   * Returns the unique id of this query.  An id is tied to the checkpoint 
location and will
+   * be the same across restarts of a given streaming query.
--- End diff --

We should fix the TODO earlier, or remove this promise for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90082045
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -87,6 +88,24 @@ def awaitTermination(self, timeout=None):
 else:
 return self._jsq.awaitTermination()
 
+@property
+@since(2.1)
+def recentProgresses(self):
+"""Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
+The number of progress updates retained for each stream is 
configured by Spark session
+configuration `spark.sql.streaming.numRecentProgresses`.
+"""
+return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+
+@property
+@since(2.1)
+def lastProgress(self):
+"""
+Returns the most recent :class:`StreamingQueryProgress` update of 
this streaming query.
+:return: a map
+"""
+return json.loads(self._jsq.lastProgress().toString())
--- End diff --

I'd use `json` as above instead of relying on the fact that the `toString` 
is `json`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90084842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
 }
   }
 }
+
+object StreamingQueryManager {
+  private val _nextId = new AtomicLong(0)
+  def nextId: Long = _nextId.getAndIncrement()
--- End diff --

`private`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90085518
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Statistics about updates made to a stateful operators in a 
[[StreamingQuery]] in a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+val numRowsTotal: Long,
+val numRowsUpdated: Long) {
+  private[sql] def jsonValue: JValue = {
+("numRowsTotal" -> JInt(numRowsTotal)) ~
+("numRowsUpdated" -> JInt(numRowsUpdated))
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Used to report statistics about progress that has been made in the 
execution of a
+ * [[StreamingQuery]]. Each event relates to processing done for a single 
trigger of the streaming
+ * query. Events are emitted even when no new data is available to be 
processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active 
queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being 
processed.  Note that in the
+ *case of retries after a failure a given batchId my be 
executed more than once.
+ *Similarly, when there is no data to be processed, the 
batchId will not be
+ *incremented.
+ * @param durationMs The amount of time taken to perform various 
operations in milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that 
store state.
+ * @param sources detailed statistics on data being read from each of the 
streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+  val id: UUID,
+  val name: String,
+  val timestamp: Long,
+  val batchId: Long, // TODO: epoch?
--- End diff --

We probably will not do this `TODO`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90085377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Statistics about updates made to a stateful operators in a 
[[StreamingQuery]] in a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
--- End diff --

We should move `SourceProgress` here or put `StateOperatorProgress` in its 
own file.  We might also consider putting them all in 
`org.apache.spark.sql.streaming.progress`, but there might not be time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90084970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Statistics about updates made to a stateful operators in a 
[[StreamingQuery]] in a trigger.
--- End diff --

nit: during a trigger?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90083627
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---
@@ -51,7 +53,7 @@ trait StreamingQuery {
   def sparkSession: SparkSession
 
   /**
-   * Whether the query is currently active or not
+   * Returns `true` if this query is actively running.
* @since 2.0.0
--- End diff --

Nit: other places have a blank line before `@since`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90086100
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -669,55 +658,48 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with Timeouts {
 }
   }
 
-
-  class QueryStatusCollector extends StreamingQueryListener {
+  /** Collects events from the StreamingQueryListener for testing */
+  class EventCollector extends StreamingQueryListener {
--- End diff --

I don't think this needs to be an inner class of `StreamTest`.  This file 
is pretty long/complicated as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90084420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 ---
@@ -81,30 +83,30 @@ object StreamingQueryListener {
   /**
* :: Experimental ::
* Event representing the start of a query
-   * @since 2.0.0
+   * @since 2.1.0
*/
   @Experimental
-  class QueryStartedEvent private[sql](val queryStatus: 
StreamingQueryStatus) extends Event
+  class QueryStartedEvent private[sql](val id: UUID, val name: String) 
extends Event
 
   /**
* :: Experimental ::
-   * Event representing any progress updates in a query
-   * @since 2.0.0
+   * Event representing any progress updates in a query.
+   * @since 2.1.0
*/
   @Experimental
-  class QueryProgressEvent private[sql](val queryStatus: 
StreamingQueryStatus) extends Event
+  class QueryProgressEvent private[sql](val progress: 
StreamingQueryProgress) extends Event
 
   /**
* :: Experimental ::
-   * Event representing that termination of a query
+   * Event representing that termination of a query.
*
-   * @param queryStatus Information about the status of the query.
-   * @param exception The exception message of the [[StreamingQuery]] if 
the query was terminated
+   * @param lastProgress The last progress the query made before it was 
terminated.
+   * @param exception The exception message of the query if the query was 
terminated
*  with an exception. Otherwise, it will be `None`.
-   * @since 2.0.0
+   * @since 2.1.0
*/
   @Experimental
   class QueryTerminatedEvent private[sql](
-  val queryStatus: StreamingQueryStatus,
-  val exception: Option[String]) extends Event
+val lastProgress: StreamingQueryProgress,
--- End diff --

What is this if no progress is ever made? `null`?  I would consider leaving 
this just the `id`, because otherwise if the query dies before progress is 
made, now you can't get the `id` at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-29 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r90084872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
 }
   }
 }
+
+object StreamingQueryManager {
--- End diff --

`private`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r89035938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Holds statistics about state that is being stored for a given streaming 
query.
+ */
+@Experimental
+class StateOperator private[sql](
+val numEntries: Long,
--- End diff --

Those sound good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r89032237
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---
@@ -64,23 +66,26 @@ trait StreamingQuery {
 
   /**
* Returns the current status of the query.
+   *
* @since 2.0.2
*/
   def status: StreamingQueryStatus
 
   /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
+   * Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
+   * The number of records retained for each stream is configured by
+   * `spark.sql.streaming.numProgressRecords`.
+   *
+   *  @since 2.1.0
*/
-  @deprecated("use status.sourceStatuses", "2.0.2")
-  def sourceStatuses: Array[SourceStatus]
+  def recentProgress: Array[StreamingQueryProgress]
--- End diff --

Hmmm, yeah maybe.  Its not clear to me that `progress` is inherently 
singular and `progresses` is kind of a mouthful.  It is maybe nice for `Array`s 
to always be plural though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r89032104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceProgress.scala ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.util.control.NonFatal
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Reports metrics on data being read from a given streaming source.
+ *
+ * @param description Description of the source.
+ * @param startOffset The starting offset for data being read.
+ * @param endOffset The ending offset for data being read.
+ * @param numRecords The number of records read from this source.
--- End diff --

I think if we update the docs as you suggest above this will be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r89032039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Holds statistics about state that is being stored for a given streaming 
query.
+ */
+@Experimental
+class StateOperator private[sql](
+val numEntries: Long,
--- End diff --

+1 to docs.  I think `numTotal` is less clear.  Total of what?  It is a 
count of the number of entries that the state store is holding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15954: [WIP][SPARK-18516][SQL] Split state and progress ...

2016-11-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15954#discussion_r89031940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceProgress.scala ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.util.control.NonFatal
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Reports metrics on data being read from a given streaming source.
--- End diff --

Sure, we can copy the docs from the main class: `Each event relates to 
processing done for a single trigger of the streaming query.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15962: [SPARK-18526][SQL][KAFKA] Allow users to configure max.p...

2016-11-21 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15962
  
Do you have any performance comparisons to show that we need to do this?

On the driver, I think we want it to be as small as possible, because we 
don't want to pull any data down (it would be even better if we could just 
issue a raw ListOffsets request).  On the executors, we should know exactly how 
many offsets we need to pull down, so I'm not sure it make sense to allow users 
to configure this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15954: [WIP][SPARK-18516][SQL] Split state and progress in stre...

2016-11-20 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/15954
  
/cc @tdas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >