[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155440091
  
**[Test build #45527 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45527/consoleFull)**
 for PR 9256 at commit 
[`fb5a296`](https://github.com/apache/spark/commit/fb5a296ac2c82eec16f2449267f94e98a46c54be).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155440812
  
**[Test build #45527 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45527/consoleFull)**
 for PR 9256 at commit 
[`fb5a296`](https://github.com/apache/spark/commit/fb5a296ac2c82eec16f2449267f94e98a46c54be).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`sealed abstract class State[S] `\n  * `sealed abstract class 
StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable `\n  
* `case class StateSpecImpl[K, V, S, T](`\n  * `sealed abstract class 
TrackStateDStream[KeyType, StateType, EmittedType: ClassTag](`\n  * `class 
InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: 
ClassTag](`\n  * `  case class StateInfo[S](`\n  * `  class LimitMarker(val 
num: Int) extends Serializable`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155440822
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45527/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155440818
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44470724
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = StateSpec.function(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[KeyType, ValueType, StateType, EmittedDataType] spec =
+ *  StateStateSpec.function[KeyType, ValueType, StateType, 
EmittedDataType](trackingFunction)
+ *.numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[KeyType, ValueType, StateType, 
EmittedType] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(rdd: RDD[(KeyType, StateType)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(javaPairRDD: JavaPairRDD[KeyType, StateType]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used.
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155572332
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155572357
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-13713
  
**[Test build #45545 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45545/consoleFull)**
 for PR 9256 at commit 
[`f1a6696`](https://github.com/apache/spark/commit/f1a669653811d6558b692fc521a0a7f29972ba33).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`sealed abstract class State[S] `\n  * `sealed abstract class 
StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable `\n  
* `case class StateSpecImpl[K, V, S, T](`\n  * `sealed abstract class 
TrackStateDStream[KeyType, StateType, EmittedType: ClassTag](`\n  * `class 
InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: 
ClassTag](`\n  * `  case class StateInfo[S](`\n  * `  class LimitMarker(val 
num: Int) extends Serializable`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-13722
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45545/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155575106
  
**[Test build #45550 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45550/consoleFull)**
 for PR 9256 at commit 
[`77c9a66`](https://github.com/apache/spark/commit/77c9a66e911adf74014bd9b16fb26153a445d372).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44484586
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = StateSpec.function(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[KeyType, ValueType, StateType, EmittedDataType] spec =
+ *  StateStateSpec.function[KeyType, ValueType, StateType, 
EmittedDataType](trackingFunction)
+ *.numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[KeyType, ValueType, StateType, 
EmittedType] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(rdd: RDD[(KeyType, StateType)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(javaPairRDD: JavaPairRDD[KeyType, StateType]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used.
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44484988
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = StateSpec.function(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[KeyType, ValueType, StateType, EmittedDataType] spec =
+ *  StateStateSpec.function[KeyType, ValueType, StateType, 
EmittedDataType](trackingFunction)
+ *.numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[KeyType, ValueType, StateType, 
EmittedType] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(rdd: RDD[(KeyType, StateType)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey` */
+  def initialState(javaPairRDD: JavaPairRDD[KeyType, StateType]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used.
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *def trackingFunction(data: Option[ValueType], wrappedState: 
State[StateType]): EmittedType = {
+ *  ...
+ *}
+ *
+ *val spec = 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155613148
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155613167
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155615006
  
**[Test build #45572 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45572/consoleFull)**
 for PR 9256 at commit 
[`ae64786`](https://github.com/apache/spark/commit/ae64786fd937002a2cc1f80518d54e970a6bbb21).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155629092
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155628691
  
**[Test build #45572 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45572/consoleFull)**
 for PR 9256 at commit 
[`ae64786`](https://github.com/apache/spark/commit/ae64786fd937002a2cc1f80518d54e970a6bbb21).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`sealed abstract class State[S] `\n  * `sealed abstract class 
StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable `\n  
* `case class StateSpecImpl[K, V, S, T](`\n  * `sealed abstract class 
TrackStateDStream[KeyType, ValueType, StateType, EmittedType: ClassTag](`\n  * 
`class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: 
ClassTag](`\n  * `  case class StateInfo[S](`\n  * `  class LimitMarker(val 
num: Int) extends Serializable`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155629094
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45572/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155688949
  
All right. I am merging this for now. I have a few more changes, 
refactorings and more unit tests that I am working on, but that can be done 
later as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44504184
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Implementation of StateMap interface representing an empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = this
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+/** Implementation of StateMap based on Spark's 
[[org.apache.spark.util.collection.OpenHashMap]] */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and states whose 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155681290
  
@tdas, look good. My comments are minor. There are some unnecessary Java 
APIs in this PR. But I can fix them while I'm adding the Java APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155437951
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-155437981
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44503176
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{Time, StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+/**
+ * Record storing the keyed-state [[TrackStateRDD]]. Each record contains 
a [[StateMap]] and a
+ * sequence of records returned by the tracking function of 
`trackStateByKey`.
+ */
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T])
+
+/**
+ * Partition of the [[TrackStateRDD]], which depends on corresponding 
partitions of prev state
+ * RDD, and a partitioned keyed-data RDD
+ */
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[rdd] var previousSessionRDDPartition: Partition = null
+  private[rdd] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+
+/**
+ * RDD storing the keyed-state of `trackStateByKey` and corresponding 
emitted records.
+ * Each partition of this RDD has a single record of type 
[[TrackStateRDDRecord]]. This contains a
+ * [[StateMap]] (containing the keyed-states) and the sequence of records 
returned by the tracking
+ * function of  `trackStateByKey`.
+ * @param prevStateRDD The previous TrackStateRDD on whose StateMap data 
`this` RDD will be created
+ * @param partitionedDataRDD The partitioned data RDD which is used update 
the previous StateMaps
+ *   in the `prevStateRDD` to create `this` RDD
+ * @param trackingFunction The function that will be used to update state 
and return new data
+ * @param batchTimeThe time of the batch to which this RDD belongs 
to. Use to update
+ */
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (Time, K, Option[V], State[S]) => Option[T],
+batchTime: Time, timeoutThresholdTime: Option[Long]
--- End diff --

nit: `timeoutThresholdTime` should be in a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44503186
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{Time, StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+/**
+ * Record storing the keyed-state [[TrackStateRDD]]. Each record contains 
a [[StateMap]] and a
+ * sequence of records returned by the tracking function of 
`trackStateByKey`.
+ */
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T])
+
+/**
+ * Partition of the [[TrackStateRDD]], which depends on corresponding 
partitions of prev state
+ * RDD, and a partitioned keyed-data RDD
+ */
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[rdd] var previousSessionRDDPartition: Partition = null
+  private[rdd] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+
+/**
+ * RDD storing the keyed-state of `trackStateByKey` and corresponding 
emitted records.
+ * Each partition of this RDD has a single record of type 
[[TrackStateRDDRecord]]. This contains a
+ * [[StateMap]] (containing the keyed-states) and the sequence of records 
returned by the tracking
+ * function of  `trackStateByKey`.
+ * @param prevStateRDD The previous TrackStateRDD on whose StateMap data 
`this` RDD will be created
+ * @param partitionedDataRDD The partitioned data RDD which is used update 
the previous StateMaps
+ *   in the `prevStateRDD` to create `this` RDD
+ * @param trackingFunction The function that will be used to update state 
and return new data
+ * @param batchTimeThe time of the batch to which this RDD belongs 
to. Use to update
+ */
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (Time, K, Option[V], State[S]) => Option[T],
+batchTime: Time, timeoutThresholdTime: Option[Long]
--- End diff --

no doc for `timeoutThresholdTime `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44338358
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44337618
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
--- End diff --

Cool. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-09 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44303122
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-09 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44299243
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
--- End diff --

We shouldn't have that as part of our API, we don't use implicit 
conversions in other places in Spark. It will be pretty confusing. Instead they 
can just do state.getOption.


---
If your project is set up for it, you can 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-09 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44299415
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
--- End diff --

TrackStateDStream sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154859018
  
**[Test build #45310 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45310/consoleFull)**
 for PR 9256 at commit 
[`a78130d`](https://github.com/apache/spark/commit/a78130d502576d437e859261137bad67e15725ae).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154858807
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154858814
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154859165
  
**[Test build #45310 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45310/consoleFull)**
 for PR 9256 at commit 
[`a78130d`](https://github.com/apache/spark/commit/a78130d502576d437e859261137bad67e15725ae).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`sealed abstract class State[S] `\n  * `sealed abstract class StateSpec[K, V, 
S, T] extends Serializable `\n  * `case class StateSpecImpl[K, V, S, T](`\n  * 
`sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](`\n  * `  case 
class StateInfo[S](`\n  * `  class LimitMarker(val num: Int) extends 
Serializable`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154859167
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44229996
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

Why not just 

StateSpec.function(..) ?

The extra "new" seems redundant.

It is also more consistent with rest of the SQL code (e.g. window function 
spec)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44230031
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

Basically what I'm saying is don't have function on StateSpec class, and 
only have function on StateSpec object, since you must define a function.

And don't have apply on StateSpec object.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44233579
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T]) {
+  /*
+  private def writeObject(outputStream: ObjectOutputStream): Unit = {
+outputStream.writeObject(stateMap)
+outputStream.writeInt(emittedRecords.size)
+val iterator = emittedRecords.iterator
+while(iterator.hasNext) {
+  outputStream.writeObject(iterator.next)
+}
+  }
+
+  private def readObject(inputStream: ObjectInputStream): Unit = {
+stateMap = inputStream.readObject().asInstanceOf[StateMap[K, S]]
+val numEmittedRecords = inputStream.readInt()
+val array = new Array[T](numEmittedRecords)
+var i = 0
+while(i < numEmittedRecords) {
+  array(i) = inputStream.readObject().asInstanceOf[T]
+}
+emittedRecords = array.toSeq
+  }*/
--- End diff --

Still a bit of WIP. I wanted to optimized the serialization a little bit, 
but I will move this work to a different PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44233566
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44233533
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
--- End diff --

I did it so that we can separate the publicly visible interfaces, from the 
private[streaming] 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44233225
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
--- End diff --

We dont "need" it. But its just convenient for people to automatically 
convert to an option, so that they can write `state.getOrElse(...)` in Scala.


---
If your project is set up for it, you can reply to this email and 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44233204
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -351,6 +351,50 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
   }
 
   /**
+   * :: Experimental ::
+   * Return a new DStream of data generated by combining the key-value 
data in `this` stream
+   * with a continuously updated per-key state. The user-provided state 
tracking function is
+   * applied on each keyed data item along with its corresponding state. 
The function can choose to
+   * update/remove the state and return a transformed data, which forms the
+   * [[org.apache.spark.streaming.dstream.EmittedRecordsDStream]].
+   *
+   * The specifications of this transformation is made through the
+   * [[org.apache.spark.streaming.StateSpec StateSpec]] class. Besides the 
tracking function, there
+   * are a number of optional parameters - initial state data, number of 
partitions, timeouts, etc.
+   * See the [[org.apache.spark.streaming.StateSpec StateSpec spec docs]] 
for more details.
+   *
+   * Scala example of using `trackStateByKey`:
+   * {{{
+   *def trackingFunction(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+   *  // Check state exists, accordingly update/remove state and 
return data to emit
+   *}
+   *
+   *val spec = StateSpec(trackingFunction).numPartitions(10)
+   *
+   *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+   *
+   * }}}
+   *
+   * Java example of using `trackStateByKey`:
+   * {{{
+   *  TODO(@zsxwing)
+   * }}}
--- End diff --

Aah, yes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44238137
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
+
+  private var state: S = null.asInstanceOf[S]
+  private var defined: Boolean = true
+  private var 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44245716
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44246742
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
--- End diff --

Sounds too generic to me, and its hard to associate `EmittedDStream` with 
`trackStateByKey`. Let me throw in another  suggestion - `TrackStateDStream`, 
dstream generated by `trackStateByKey`. Easy to associate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44245704
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

I am cool with what matei said - function on the object, not on the class. 
just want to point out that the original confusion raised by @pwendell -- not 
having `function` (i.e. setters for function) on StateSpec class was confusing 
for him. @pwendell do you think, with the updated docs, is it still a concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44246950
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  /** Return a pair DStream where each RDD is the snapshot of the state of 
all the keys. */
+  def stateSnapshots(): DStream[(K, S)]
+}
+
+/** Internal implementation of the [[EmittedRecordsDStream]] */
+private[streaming] class EmittedRecordsDStreamImpl[
+K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+trackStateDStream: TrackStateDStream[K, V, S, T])
+  extends EmittedRecordsDStream[K, S, T](trackStateDStream.context) {
+
+  override def slideDuration: Duration = trackStateDStream.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(trackStateDStream)
+
+  override def compute(validTime: Time): Option[RDD[T]] = {
+trackStateDStream.getOrCompute(validTime).map { _.flatMap[T] { 
_.emittedRecords } }
+  }
+
+  def stateSnapshots(): DStream[(K, S)] = {
+trackStateDStream.flatMap[(K, S)] {
+  _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+  }
+}
+
+/**
+ * A DStream that allows per-key state to be maintains, and arbitrary 
records to be generated
+ * based on updates to the state.
+ *
+ * @param parent Parent (key, value) stream that is the source
+ * @param spec Specifications of the trackStateByKey operation
+ * @tparam K   Key type
+ * @tparam V   Value type
+ * @tparam S   Type of the state maintained
+ * @tparam T   Type of the eiitted records
+ */
+private[streaming] class TrackStateDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, T])
+  extends DStream[TrackStateRDDRecord[K, S, T]](parent.context) {
+
+  persist(StorageLevel.MEMORY_ONLY)
+
+  private val partitioner = spec.getPartitioner().getOrElse(
+new HashPartitioner(ssc.sc.defaultParallelism))
+
+  private val trackingFunction = spec.getFunction()
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(parent)
+
+  override val mustCheckpoint = true
+
+  /** Method that generates a RDD for the given time */
+  override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, 
S, T]]] = {
+val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse {
+  TrackStateRDD.createFromPairRDD[K, V, S, T](
+spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, 
S)](ssc.sparkContext)),
+partitioner,
+validTime.milliseconds
+  )
+}
+val newDataRDD = parent.getOrCompute(validTime).get
+val partitionedDataRDD = newDataRDD.partitionBy(partitioner)
+

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213675
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -351,6 +351,50 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
   }
 
   /**
+   * :: Experimental ::
+   * Return a new DStream of data generated by combining the key-value 
data in `this` stream
+   * with a continuously updated per-key state. The user-provided state 
tracking function is
+   * applied on each keyed data item along with its corresponding state. 
The function can choose to
+   * update/remove the state and return a transformed data, which forms the
+   * [[org.apache.spark.streaming.dstream.EmittedRecordsDStream]].
+   *
+   * The specifications of this transformation is made through the
+   * [[org.apache.spark.streaming.StateSpec StateSpec]] class. Besides the 
tracking function, there
+   * are a number of optional parameters - initial state data, number of 
partitions, timeouts, etc.
+   * See the [[org.apache.spark.streaming.StateSpec StateSpec spec docs]] 
for more details.
+   *
+   * Scala example of using `trackStateByKey`:
+   * {{{
+   *def trackingFunction(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+   *  // Check state exists, accordingly update/remove state and 
return data to emit
+   *}
+   *
+   *val spec = StateSpec(trackingFunction).numPartitions(10)
+   *
+   *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+   *
+   * }}}
+   *
+   * Java example of using `trackStateByKey`:
+   * {{{
+   *  TODO(@zsxwing)
+   * }}}
--- End diff --

The Java example should only be in the Java API, doesn't make sense to have 
it here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213622
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
--- End diff --

I'd just call it EmittedDStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214012
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213899
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
--- End diff --

Typo: time


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213890
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
--- End diff --

Why do we need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213932
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
+
+  private var state: S = null.asInstanceOf[S]
+  private var defined: Boolean = true
+  private var 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44213921
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.language.implicitConversions
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Scala example of using `State`:
+ * {{{
+ *def trackStateFunc(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+ *
+ *  // Check if state exists
+ *  if (state.exists) {
+ *
+ *val existingState = wrappedState.get  // Get the existing state
+ *
+ *val shouldRemove = ...  // Decide whether to remove the state
+ *
+ *if (shouldRemove) {
+ *
+ *  wrappedState.remove() // Remove the state
+ *
+ *} else {
+ *
+ *  val newState = ...
+ *  wrappedState(newState)// Set the new state
+ *
+ *}
+ *  } else {
+ *
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *
+ *  }
+ *}
+ *
+ * }}}
+ *
+ * Java example:
+ * {{{
+ *  TODO(@zsxwing)
+ * }}}
+ */
+@Experimental
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise it will throw 
`java.util.NoSuchElementException`.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   *
+   * @throws java.util.NoSuchElementException If the state does not exist.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   *
+   * @throws java.lang.IllegalArgumentException If the state has already 
been removed, or is
+   *going to be removed
+   */
+  def update(newState: S): Unit
+
+  /**
+   * Remove the state if it exists.
+   *
+   * State cannot be updated if it has been already removed (that is, 
`remove()` has already been
+   * called) or it is going to be removed due to timeout (that is, 
`isTimingOut()` is `true`).
+   */
+  def remove(): Unit
+
+  /**
+   * Whether the state is timing out and going to be removed by the system 
after the current batch.
+   * This timeou can occur if timeout duration has been specified in the
+   * [[org.apache.spark.streaming.StateSpec StatSpec]] and the key has not 
received any new data
+   * for that timeout duration.
+   */
+  def isTimingOut(): Boolean
+
+  /**
+   * Get the state as an [[scala.Option]]. It will be `Some(state)` if it 
exists, otherwise `None`.
+   */
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+private[streaming]
+object State {
+  implicit def toOption[S](state: State[S]): Option[S] = state.getOption()
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
--- End diff --

Why did we separate this into another class? I don't see any other 
subclasses of State or 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214006
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

Why do we want StateSpec() to return anything at all? Why not `new 
StateSpec().function(...)`? It seems more similar to other builder code. 
Together with the StateSpec.create() methods for common cases, it should be 
okay, and there's no need for apply() anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214008
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214064
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T]) {
+  /*
+  private def writeObject(outputStream: ObjectOutputStream): Unit = {
+outputStream.writeObject(stateMap)
+outputStream.writeInt(emittedRecords.size)
+val iterator = emittedRecords.iterator
+while(iterator.hasNext) {
+  outputStream.writeObject(iterator.next)
+}
+  }
+
+  private def readObject(inputStream: ObjectInputStream): Unit = {
+stateMap = inputStream.readObject().asInstanceOf[StateMap[K, S]]
+val numEmittedRecords = inputStream.readInt()
+val array = new Array[T](numEmittedRecords)
+var i = 0
+while(i < numEmittedRecords) {
+  array(i) = inputStream.readObject().asInstanceOf[T]
+}
+emittedRecords = array.toSeq
+  }*/
--- End diff --

Do we need these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214022
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  /** Return a pair DStream where each RDD is the snapshot of the state of 
all the keys. */
+  def stateSnapshots(): DStream[(K, S)]
+}
+
+/** Internal implementation of the [[EmittedRecordsDStream]] */
+private[streaming] class EmittedRecordsDStreamImpl[
+K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+trackStateDStream: TrackStateDStream[K, V, S, T])
+  extends EmittedRecordsDStream[K, S, T](trackStateDStream.context) {
+
+  override def slideDuration: Duration = trackStateDStream.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(trackStateDStream)
+
+  override def compute(validTime: Time): Option[RDD[T]] = {
+trackStateDStream.getOrCompute(validTime).map { _.flatMap[T] { 
_.emittedRecords } }
+  }
+
+  def stateSnapshots(): DStream[(K, S)] = {
+trackStateDStream.flatMap[(K, S)] {
+  _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+  }
+}
+
+/**
+ * A DStream that allows per-key state to be maintains, and arbitrary 
records to be generated
+ * based on updates to the state.
+ *
+ * @param parent Parent (key, value) stream that is the source
+ * @param spec Specifications of the trackStateByKey operation
+ * @tparam K   Key type
+ * @tparam V   Value type
+ * @tparam S   Type of the state maintained
+ * @tparam T   Type of the eiitted records
+ */
+private[streaming] class TrackStateDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, T])
+  extends DStream[TrackStateRDDRecord[K, S, T]](parent.context) {
+
+  persist(StorageLevel.MEMORY_ONLY)
+
+  private val partitioner = spec.getPartitioner().getOrElse(
+new HashPartitioner(ssc.sc.defaultParallelism))
+
+  private val trackingFunction = spec.getFunction()
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(parent)
+
+  override val mustCheckpoint = true
+
+  /** Method that generates a RDD for the given time */
+  override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, 
S, T]]] = {
+val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse {
+  TrackStateRDD.createFromPairRDD[K, V, S, T](
+spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, 
S)](ssc.sparkContext)),
+partitioner,
+validTime.milliseconds
+  )
+}
+val newDataRDD = parent.getOrCompute(validTime).get
+val partitionedDataRDD = newDataRDD.partitionBy(partitioner)
+  

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-07 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44214016
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  /** Return a pair DStream where each RDD is the snapshot of the state of 
all the keys. */
+  def stateSnapshots(): DStream[(K, S)]
+}
+
+/** Internal implementation of the [[EmittedRecordsDStream]] */
+private[streaming] class EmittedRecordsDStreamImpl[
+K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+trackStateDStream: TrackStateDStream[K, V, S, T])
+  extends EmittedRecordsDStream[K, S, T](trackStateDStream.context) {
+
+  override def slideDuration: Duration = trackStateDStream.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(trackStateDStream)
+
+  override def compute(validTime: Time): Option[RDD[T]] = {
+trackStateDStream.getOrCompute(validTime).map { _.flatMap[T] { 
_.emittedRecords } }
+  }
+
+  def stateSnapshots(): DStream[(K, S)] = {
+trackStateDStream.flatMap[(K, S)] {
+  _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+  }
+}
+
+/**
+ * A DStream that allows per-key state to be maintains, and arbitrary 
records to be generated
+ * based on updates to the state.
+ *
+ * @param parent Parent (key, value) stream that is the source
+ * @param spec Specifications of the trackStateByKey operation
+ * @tparam K   Key type
+ * @tparam V   Value type
+ * @tparam S   Type of the state maintained
+ * @tparam T   Type of the eiitted records
+ */
+private[streaming] class TrackStateDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, T])
+  extends DStream[TrackStateRDDRecord[K, S, T]](parent.context) {
+
+  persist(StorageLevel.MEMORY_ONLY)
+
+  private val partitioner = spec.getPartitioner().getOrElse(
+new HashPartitioner(ssc.sc.defaultParallelism))
+
+  private val trackingFunction = spec.getFunction()
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(parent)
+
+  override val mustCheckpoint = true
+
+  /** Method that generates a RDD for the given time */
+  override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, 
S, T]]] = {
+val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse {
+  TrackStateRDD.createFromPairRDD[K, V, S, T](
+spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, 
S)](ssc.sparkContext)),
+partitioner,
+validTime.milliseconds
+  )
+}
+val newDataRDD = parent.getOrCompute(validTime).get
+val partitionedDataRDD = newDataRDD.partitionBy(partitioner)
+  

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44197538
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
--- End diff --

note to self: Update this to name KeyType, ValueType, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44205619
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and states 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44205625
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and states 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44198001
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44200358
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and states 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44197569
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44197742
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44205053
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

@rxin Can you comment on the builder pattern used by StateSpec? @pwendell 
Correct me if I am wrong but one point of confusion for him was that there was 
no reference to the tracking function in the API of the StateSpec class (e.g. 
no setter method for the function). This is because the function is specified 
though the StateSpec object's apply/create method. The reason I did it this way 
was to force the user to specify the function to create a spec, as that is the 
only must-provide parameter for `trackStateByKey`

Two options to avoid the confusion are
1. Add a setter for the function instead of taking a function through 
StateSpec.apply(). So instead of `StateSpec(func)` the user will be doing 
`StateSpec.function(func)`. I dont like the latter, more verbose.
2. Leave it as is and provide better docs (Patrick didnt have the docs to 
look at, now there is).
@rxin whats your call. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44206807
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44206972
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

Actually, I correct myself. have `object StateSpec { def apply() ... }` and 
`class StateSpec { def function(...)  }` makes the syntax cumbersome as that 
can be called only as `StateSpec().function(func)` ( not 
`StateSpec.function(func)` that I had thought earlier). To achieve 
`StateSpec.function(func)`, the method `function` must be a method of the 
object, like `StateSpec.apply()`. That does not solve @pwendell 's problem of 
not having the setter method `function` in the class. 

We could have both - the method `function` on both the class and object. 
That would solve both problems - use the object `StateSpec.function(func)` and 
`function` setter in the `StateSpec` class. But that would also allow the user 
to write `StateSpec.function(func1).function(func2)`. What should the semantics 
be in that case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44197477
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44197458
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -351,6 +351,50 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
   }
 
   /**
+   * :: Experimental ::
+   * Return a new DStream of data generated by combining the key-value 
data in `this` stream
+   * with a continuously updated per-key state. The user-provided state 
tracking function is
+   * applied on each keyed data item along with its corresponding state. 
The function can choose to
+   * update/remove the state and return a transformed data, which forms the
+   * [[org.apache.spark.streaming.dstream.EmittedRecordsDStream]].
+   *
+   * The specifications of this transformation is made through the
+   * [[org.apache.spark.streaming.StateSpec StateSpec]] class. Besides the 
tracking function, there
+   * are a number of optional parameters - initial state data, number of 
partitions, timeouts, etc.
+   * See the [[org.apache.spark.streaming.StateSpec StateSpec spec docs]] 
for more details.
+   *
+   * Scala example of using `trackStateByKey`:
+   * {{{
+   *def trackingFunction(key: String, data: Option[Int], wrappedState: 
State[Int]): Option[Int] = {
+   *  // Check state exists, accordingly update/remove state and 
return data to emit
+   *}
+   *
+   *val spec = StateSpec(trackingFunction).numPartitions(10)
+   *
+   *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+   *
+   * }}}
+   *
+   * Java example of using `trackStateByKey`:
+   * {{{
+   *  TODO(@zsxwing)
+   * }}}
+   *
+   * @tparam StateType
+   */
+  @Experimental
+  def trackStateByKey[StateType: ClassTag, EmittedType: ClassTag](
+spec: StateSpec[K, V, StateType, StateType]): EmittedRecordsDStream[K, 
StateType, EmittedType] = {
--- End diff --

nit: spec: StateSpec[K, V, StateType, `StateType`] -> spec: StateSpec[K, V, 
StateType, `EmittedType`]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44202542
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44205513
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and states 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44205466
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
--- End diff --

I really don't like apply on an object to create everything. I like 
```
StateSpec.function(...)
```
as the only constructor, which also makes it obvious it is setting a 
function.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44207353
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154429147
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44187623
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44189416
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+/**
+ * :: Experimental ::
+ * DStream representing the stream of records emitted after the 
`trackStateByKey` operation
+ * on a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] in Scala.
+ * Additionally, it also gives access to the stream of state snapshots, 
that is, the state data of
+ * all keys after a batch has updated them.
+ *
+ * @tparam K Class of the state key
+ * @tparam S Class of the state data
+ * @tparam T Class of the emitted records
+ */
+@Experimental
+sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  /** Return a pair DStream where each RDD is the snapshot of the state of 
all the keys. */
+  def stateSnapshots(): DStream[(K, S)]
+}
+
+/** Internal implementation of the [[EmittedRecordsDStream]] */
+private[streaming] class EmittedRecordsDStreamImpl[
+K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+trackStateDStream: TrackStateDStream[K, V, S, T])
+  extends EmittedRecordsDStream[K, S, T](trackStateDStream.context) {
+
+  override def slideDuration: Duration = trackStateDStream.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(trackStateDStream)
+
+  override def compute(validTime: Time): Option[RDD[T]] = {
+trackStateDStream.getOrCompute(validTime).map { _.flatMap[T] { 
_.emittedRecords } }
+  }
+
+  def stateSnapshots(): DStream[(K, S)] = {
+trackStateDStream.flatMap[(K, S)] {
+  _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+  }
+}
+
+/**
+ * A DStream that allows per-key state to be maintains, and arbitrary 
records to be generated
+ * based on updates to the state.
+ *
+ * @param parent Parent (key, value) stream that is the source
+ * @param spec Specifications of the trackStateByKey operation
+ * @tparam K   Key type
+ * @tparam V   Value type
+ * @tparam S   Type of the state maintained
+ * @tparam T   Type of the eiitted records
+ */
+private[streaming] class TrackStateDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, T])
+  extends DStream[TrackStateRDDRecord[K, S, T]](parent.context) {
+
+  persist(StorageLevel.MEMORY_ONLY)
+
+  private val partitioner = spec.getPartitioner().getOrElse(
+new HashPartitioner(ssc.sc.defaultParallelism))
+
+  private val trackingFunction = spec.getFunction()
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(parent)
+
+  override val mustCheckpoint = true
+
+  /** Method that generates a RDD for the given time */
+  override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, 
S, T]]] = {
+val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse {
+  TrackStateRDD.createFromPairRDD[K, V, S, T](
+spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, 
S)](ssc.sparkContext)),
+partitioner,
+validTime.milliseconds
+  )
+}
+val newDataRDD = parent.getOrCompute(validTime).get
+val partitionedDataRDD = newDataRDD.partitionBy(partitioner)
   

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44190145
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44190238
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154550377
  
Great job so far, I like this a lot !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44187283
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{HashPartitioner, Partitioner}
+
+
+/**
+ * :: Experimental ::
+ * Abstract class representing all the specifications of the DStream 
transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
+ * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create 
instances of
+ * this class.
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec);
+ * }}}
+ */
+@Experimental
+sealed abstract class StateSpec[K, V, S, T] extends Serializable {
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(rdd: RDD[(K, S)]): this.type
+
+  /** Set the RDD containing the initial states that will be used by 
`trackStateByKey`*/
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  /**
+   * Set the number of partitions by which the state RDDs generated by 
`trackStateByKey`
+   * will be partitioned. Hash partitioning will be used on the
+   */
+  def numPartitions(numPartitions: Int): this.type
+
+  /**
+   * Set the partitioner by which the state RDDs generated by 
`trackStateByKey` will be
+   * be partitioned.
+   */
+  def partitioner(partitioner: Partitioner): this.type
+
+  /**
+   * Set the duration after which the state of an idle key will be 
removed. A key and its state is
+   * considered idle if it has not received any data for at least the 
given duration. The state
+   * tracking function will be called one final time on the idle states 
that are going to be
+   * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set
+   * to `true` in that call.
+   */
+  def timeout(idleDuration: Duration): this.type
+}
+
+
+/**
+ * :: Experimental ::
+ * Builder object for creating instances of 
[[org.apache.spark.streaming.StateSpec StateSpec]]
+ * that is used for specifying the parameters of the DStream transformation
+ * `trackStateByKey` operation of a
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] (Scala) or a
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] 
(Java).
+ *
+ * Example in Scala:
+ * {{{
+ *val spec = StateSpec(trackingFunction).numPartitions(10)
+ *
+ *val emittedRecordDStream = 
keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec)
+ * }}}
+ *
+ * Example in Java:
+ * {{{
+ *StateStateSpec[StateType, EmittedDataType] spec =
+ *  StateStateSpec.create[StateType, 
EmittedDataType](trackingFunction).numPartition(10);
+ *
+ *JavaDStream[EmittedDataType] emittedRecordDStream =
+ *  javaPairDStream.trackStateByKey[StateType, 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44187957
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T]) {
+  /*
+  private def writeObject(outputStream: ObjectOutputStream): Unit = {
+outputStream.writeObject(stateMap)
+outputStream.writeInt(emittedRecords.size)
+val iterator = emittedRecords.iterator
+while(iterator.hasNext) {
+  outputStream.writeObject(iterator.next)
+}
+  }
+
+  private def readObject(inputStream: ObjectInputStream): Unit = {
+stateMap = inputStream.readObject().asInstanceOf[StateMap[K, S]]
+val numEmittedRecords = inputStream.readInt()
+val array = new Array[T](numEmittedRecords)
+var i = 0
+while(i < numEmittedRecords) {
+  array(i) = inputStream.readObject().asInstanceOf[T]
+}
+emittedRecords = array.toSeq
+  }*/
+}
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[rdd] var previousSessionRDDPartition: Partition = null
+  private[rdd] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+
+/**
+ * RDD storing the keyed-state of trackStateByKey and corresponding 
emitted records.
+ * Each partition of this RDD has a single record that contains a StateMap 
storing
+ */
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+partitionedDataRDD.sparkContext,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(prevStateRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44188027
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD}
+import org.apache.spark.streaming.{StateImpl, State}
+import org.apache.spark.streaming.util.{EmptyStateMap, StateMap}
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+private[streaming] case class TrackStateRDDRecord[K, S, T](
+var stateMap: StateMap[K, S], var emittedRecords: Seq[T]) {
+  /*
+  private def writeObject(outputStream: ObjectOutputStream): Unit = {
+outputStream.writeObject(stateMap)
+outputStream.writeInt(emittedRecords.size)
+val iterator = emittedRecords.iterator
+while(iterator.hasNext) {
+  outputStream.writeObject(iterator.next)
+}
+  }
+
+  private def readObject(inputStream: ObjectInputStream): Unit = {
+stateMap = inputStream.readObject().asInstanceOf[StateMap[K, S]]
+val numEmittedRecords = inputStream.readInt()
+val array = new Array[T](numEmittedRecords)
+var i = 0
+while(i < numEmittedRecords) {
+  array(i) = inputStream.readObject().asInstanceOf[T]
+}
+emittedRecords = array.toSeq
+  }*/
+}
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[rdd] var previousSessionRDDPartition: Partition = null
+  private[rdd] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+
+/**
+ * RDD storing the keyed-state of trackStateByKey and corresponding 
emitted records.
+ * Each partition of this RDD has a single record that contains a StateMap 
storing
+ */
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+partitionedDataRDD.sparkContext,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(prevStateRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-06 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44189624
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/9256#issuecomment-154141796
  
@tdas can you also build an example file so we can look at the example 
itself?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44045397
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+
+abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
--- End diff --

What exactly does this do? EmittedDStream seems unclear.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44045076
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/TrackStateSpec.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Abstract class having all the specifications of 
DStream.trackStateByKey().
+ * Use the `TrackStateSpec.create()` or `TrackStateSpec.create()` to 
create instances of this class.
+ *
+ * {{{
+ *TrackStateSpec(trackingFunction)// in Scala
+ *TrackStateSpec.create(trackingFunction) // in Java
+ * }}}
+ */
+sealed abstract class TrackStateSpec[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag]
+  extends Serializable {
+
+  def initialState(rdd: RDD[(K, S)]): this.type
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  def numPartitions(numPartitions: Int): this.type
+  def partitioner(partitioner: Partitioner): this.type
+
+  def timeout(interval: Duration): this.type
--- End diff --

Is this scala Duration? Why not just take milliseconds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44055168
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+
+abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  def stateSnapshots(): DStream[(K, S)]
--- End diff --

snapshotStream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44087502
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
 ---
@@ -44,18 +44,6 @@ object StatefulNetworkWordCount {
 
 StreamingExamples.setStreamingLogLevels()
 
-val updateFunc = (values: Seq[Int], state: Option[Int]) => {
--- End diff --

Since `updateStateByKey` isn't removed, maybe it's better to keep this 
example and create a new file for `trackStateByKey`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44067879
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -350,6 +349,18 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 )
   }
 
+  /** TODO: Add scala docs */
+  def trackStateByKey[S: ClassTag, T: ClassTag](
--- End diff --

For the method signature, since `spec` contains `ClassTag`s for `S` and 
`T`, we can get it from `spec` and don't need to add `ClassTag` in the method 
signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44070212
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -350,6 +349,18 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 )
   }
 
+  /** TODO: Add scala docs */
+  def trackStateByKey[S: ClassTag, T: ClassTag](
--- End diff --

Actually, `ClassTags` in `TrackStateSpec` and `TrackStateSpecImpl` are 
unnecessary. We can remove them and keep `S: ClassTag, T: ClassTag` in 
`trackStateByKey`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44091460
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] and
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
+ * {{{
+ *
+ * }}}
+ */
+sealed abstract class State[S] {
+
+  /** Whether the state already exists */
+  def exists(): Boolean
+
+  /**
+   * Get the state if it exists, otherwise wise it will throw an exception.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value. Note that you cannot update the 
state if the state is
+   * timing out (that is, `isTimingOut() return true`, or if the state has 
already been removed by
+   * `remove()`.
+   */
+  def update(newState: S): Unit
+
+  /** Remove the state if it exists. */
+  def remove(): Unit
+
+  /** Is the state going to be timed out by the system after this batch 
interval */
+  def isTimingOut(): Boolean
+
+  @inline final def getOption(): Option[S] = Option(get())
+
+  /** Get the state if it exists, otherwise return the default value */
+  @inline final def getOrElse[S1 >: S](default: => S1): S1 = {
+if (exists) this.get else default
+  }
+
+  @inline final override def toString(): String = {
+getOption.map { _.toString }.getOrElse("")
+  }
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
+
+  private var state: S = null.asInstanceOf[S]
+  private var defined: Boolean = true
+  private var timingOut: Boolean = false
+  private var updated: Boolean = false
+  private var removed: Boolean = false
+
+  // = Public API =
+  def exists(): Boolean = {
+defined
+  }
+
+  def get(): S = {
+state
--- End diff --

`State.get` says `Get the state if it exists, otherwise wise it will throw 
an exception.` but here it won't throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44095840
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/TrackStateSpec.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Abstract class having all the specifications of 
DStream.trackStateByKey().
+ * Use the `TrackStateSpec.create()` or `TrackStateSpec.create()` to 
create instances of this class.
+ *
+ * {{{
+ *TrackStateSpec(trackingFunction)// in Scala
+ *TrackStateSpec.create(trackingFunction) // in Java
+ * }}}
+ */
+sealed abstract class TrackStateSpec[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag]
+  extends Serializable {
+
+  def initialState(rdd: RDD[(K, S)]): this.type
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  def numPartitions(numPartitions: Int): this.type
+  def partitioner(partitioner: Partitioner): this.type
+
+  def timeout(interval: Duration): this.type
+}
+
+
+/** Builder object for creating instances of TrackStateSpec */
+object TrackStateSpec {
+
+  def apply[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+  trackingFunction: (K, Option[V], State[S]) => Option[T]): 
TrackStateSpec[K, V, S, T] = {
+new TrackStateSpecImpl[K, V, S, T](trackingFunction)
+  }
+
+  def create[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+  trackingFunction: (K, Option[V], State[S]) => Option[T]): 
TrackStateSpec[K, V, S, T] = {
+apply(trackingFunction)
+  }
--- End diff --

An alternative suggestion from community. Just make an implicit conversion 
to Option. That would make Scala users happy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44096879
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+
+abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  def stateSnapshots(): DStream[(K, S)]
+}
+
+
+private[streaming] class EmittedRecordsDStreamImpl[
+K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+trackStateDStream: TrackStateDStream[K, V, S, T])
+  extends EmittedRecordsDStream[K, V, S, T](trackStateDStream.context) {
+
+  override def slideDuration: Duration = trackStateDStream.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(trackStateDStream)
+
+  override def compute(validTime: Time): Option[RDD[T]] = {
+trackStateDStream.getOrCompute(validTime).map { _.flatMap[T] { 
_.emittedRecords } }
+  }
+
+  def stateSnapshots(): DStream[(K, S)] = {
+trackStateDStream.flatMap[(K, S)] {
+  _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+  }
+}
+
+/**
+ * A DStream that allows per-key state to be maintains, and arbitrary 
records to be generated
+ * based on updates to the state.
+ *
+ * @param parent Parent (key, value) stream that is the source
+ * @param spec Specifications of the trackStateByKey operation
+ * @tparam K   Key type
+ * @tparam V   Value type
+ * @tparam S   Type of the state maintained
+ * @tparam T   Type of the eiitted records
+ */
+private[streaming] class TrackStateDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+parent: DStream[(K, V)], spec: TrackStateSpecImpl[K, V, S, T])
+  extends DStream[TrackStateRDDRecord[K, S, T]](parent.context) {
+
+  persist(StorageLevel.MEMORY_ONLY)
+
+  private val partitioner = spec.getPartitioner().getOrElse(
+new HashPartitioner(ssc.sc.defaultParallelism))
+
+  private val trackingFunction = spec.getFunction()
+
+  override def slideDuration: Duration = parent.slideDuration
+
+  override def dependencies: List[DStream[_]] = List(parent)
+
+  override val mustCheckpoint = true
+
+  /** Method that generates a RDD for the given time */
+  override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, 
S, T]]] = {
+val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse {
+  TrackStateRDD.createFromPairRDD[K, V, S, T](
+spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, 
S)](ssc.sparkContext)),
+partitioner,
+validTime.milliseconds
+  )
+}
+val newDataRDD = parent.getOrCompute(validTime).get
--- End diff --

> In practice it really cannot, but yeah its better to account for None's

A user custom DStream may return `None`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44098629
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 ---
@@ -350,6 +349,18 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 )
   }
 
+  /** TODO: Add scala docs */
+  def trackStateByKey[S: ClassTag, T: ClassTag](
+spec: TrackStateSpec[K, V, S, T]): EmittedRecordsDStream[K, V, S, T] = 
{
+new EmittedRecordsDStreamImpl[K, V, S, T](
+  new TrackStateDStream[K, V, S, T](
+self,
+spec.asInstanceOf[TrackStateSpecImpl[K, V, S, T]]
--- End diff --

The DataFrameReader does not seem to have any getters. So I am not sure how 
to derive inspiration from it. 

From offline discussion, the confusion was that the function is not part of 
the API of `TrackStateSpec` class. And that is because there the function is 
specified through `TrackStateSpec.apply(func)`.  I had designed it that way 
because the function is the only spec that must be provided, and the rest is 
optional. So having the `TrackStateSpec.apply()` take the function ensures that 
its concise (`TrackStateSpec(func)`) and dumb proof (the user has to specify 
it). 

But if this is confusing, then there are two options. 
1. Make `function()` a method with which the user can specify the function. 
So the user could write either `TrackStateSpec.function(func)` or 
`TrackStateSpec(func)` and the `function` is visible in the API + docs of the 
TrackStateSpec class.
2. Just leave it as is and make the docs of `TrackStateSpec` class (and 
also other docs) clearly show the usage using the `TrackStateSpec` object.











---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44095967
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
+  override def remove(key: K): Unit = { }
+  override def toDebugString(): String = ""
+}
+
+
+
+/** Implementation of StateMap based on Spark's OpenHashMap */
+private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: 
ClassTag](
+@transient @volatile var parentStateMap: StateMap[K, S],
+initialCapacity: Int = 64,
+deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
+  ) extends StateMap[K, S] { self =>
+
+  def this(initialCapacity: Int, deltaChainThreshold: Int) = this(
+new EmptyStateMap[K, S],
+initialCapacity = initialCapacity,
+deltaChainThreshold = deltaChainThreshold)
+
+  def this(deltaChainThreshold: Int) = this(
+initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+
+  def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  @transient @volatile private var deltaMap =
+new OpenHashMap[K, StateInfo[S]](initialCapacity)
+
+  /** Get the session data if it exists */
+  override def get(key: K): Option[S] = {
+val stateInfo = deltaMap(key)
+if (stateInfo != null) {
+  if (!stateInfo.deleted) {
+Some(stateInfo.data)
+  } else {
+None
+  }
+} else {
+  parentStateMap.get(key)
+}
+  }
+
+  /** Get all the keys and 

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44095929
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+
+abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
--- End diff --

This is the stream of records T generated after apply the function `(K, 
Option[V], State[S]) => Option[T]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44095867
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/EmittedRecordsDStream.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+
+
+abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+ssc: StreamingContext) extends DStream[T](ssc) {
+
+  def stateSnapshots(): DStream[(K, S)]
--- End diff --

snapshots of what?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-11-05 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r44094830
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
+import org.apache.spark.util.collection.OpenHashMap
+
+/** Internal interface for defining the map that keeps track of sessions. 
*/
+private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] 
extends Serializable {
+
+  /** Get the state for a key if it exists */
+  def get(key: K): Option[S]
+
+  /** Get all the keys and states whose updated time is older than the 
given threshold time */
+  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
+
+  /** Get all the keys and states in this map. */
+  def getAll(): Iterator[(K, S, Long)]
+
+  /** Add or update state */
+  def put(key: K, state: S, updatedTime: Long): Unit
+
+  /** Remove a key */
+  def remove(key: K): Unit
+
+  /**
+   * Shallow copy `this` map to create a new state map.
+   * Updates to the new map should not mutate `this` map.
+   */
+  def copy(): StateMap[K, S]
+
+  def toDebugString(): String = toString()
+}
+
+/** Companion object for [[StateMap]], with utility methods */
+private[streaming] object StateMap {
+  def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new 
EmptyStateMap[K, S]
+
+  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
+val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
+  DELTA_CHAIN_LENGTH_THRESHOLD)
+new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+  }
+}
+
+/** Specific implementation of SessionStore interface representing an 
empty map */
+private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends 
StateMap[K, S] {
+  override def put(key: K, session: S, updateTime: Long): Unit = {
+throw new NotImplementedError("put() should not be called on an 
EmptyStateMap")
+  }
+  override def get(key: K): Option[S] = None
+  override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] 
= Iterator.empty
+  override def getAll(): Iterator[(K, S, Long)] = Iterator.empty
+  override def copy(): StateMap[K, S] = new EmptyStateMap[K, S]
--- End diff --

nit: this method can return `this` since it's immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >