[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...

2017-08-03 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18840

[SPARK-21565] Propagate metadata in attribute replacement.

## What changes were proposed in this pull request?

Propagate metadata in attribute replacement during streaming execution. 
This is necessary for EventTimeWatermarks consuming replaced attributes.

## How was this patch tested?
new unit test, which was verified to fail before the fix

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18840


commit e54d81200569c2260f0995b2f91aa9829dc10ad7
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-04T03:52:57Z

Propagate metadata in attribute replacement.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode

2017-08-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18925
  
@marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18925

[SPARK-21713][SC] Replace streaming bit with OutputMode

## What changes were proposed in this pull request?

* Replace LogicalPlan.isStreaming with output mode.
* Replace Deduplicate.streaming with output mode.

Note that this is an implementation-only change, so it deliberately does 
not change isStreaming in the Dataset API.

## How was this patch tested?

refactoring only - ran existing unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SC-8027

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18925


commit 4d8fe6e3e0300b847cbf11a8c29b9cda696bb238
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-10T16:59:40Z

Rename FlatMapGroupsWithState.outputMode to funcOutputMode.

commit aacf0592e7720e4784377714673cc4d2151be66d
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-10T16:24:56Z

partial

commit e23b1d0c7563eee61697b7a3a4a1a3f6fe1d
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-10T17:54:54Z

Replace LogicalPlan.isStreaming with outputMode Append() and Complete().

commit d2f7e604f58071f7f14c68f9760e1b5d0b705487
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-11T17:25:52Z

Replace Deduplicate.streaming with output mode.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode

2017-08-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18925
  
@tdas - please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-11 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18925#discussion_r132795536
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode {
 }
 
 /** A logical plan for `dropDuplicates`. */
+case object Deduplicate {
+  def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = {
+Deduplicate(keys, child, child.outputMode)
+  }
+}
+
 case class Deduplicate(
 keys: Seq[Attribute],
 child: LogicalPlan,
-streaming: Boolean) extends UnaryNode {
+originalOutputMode: OutputMode) extends UnaryNode {
--- End diff --

The intent here is that callers who need a Deduplicate will use the 
two-argument form in the Object, which will then use the constructor to 
preserve the output mode of the child.

A val defined inside the case class isn't accounted for by copy(), which 
caused test failures when I tried it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-08 Thread joseph-torres
GitHub user joseph-torres reopened a pull request:

https://github.com/apache/spark/pull/18790

[SPARK-21587][SS] Added pushdown through watermarks.

## What changes were proposed in this pull request?

* Filter predicates can be pushed through EventTimeWatermark if they're 
deterministic and do not reference the watermarked attribute.
* Projects can be pushed through EventTimeWatermark if they include the 
watermarked attribute.
* Limits can be pushed through EventTimeWatermark.

## How was this patch tested?
unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18790


commit 9cc8da5dbc5be9b7b663e002097214e3c0720801
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-07-31T22:58:03Z

Added pushdown below watermarks.

commit 154d34820be73f7d20bf1119fb385940d0ce6455
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-01T16:47:32Z

Push Project, Limit, and Filter through watermark when appropriate.

commit 84575b60609a3efc9824eba96541011a99313a63
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-07T20:50:29Z

Remove pushdown limit through watermark.

commit 4cae8973f52078afae2a9d92d59c91edaab0ba88
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-09T01:56:46Z

remove leaked import




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-08 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/18790


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.

2017-08-08 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18790
  
I'm told I can reopen this instead of making a new PR for the same branch. 
Reopening with fixed commit history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18889: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-08 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/18889


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18889: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-08 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18889

[SPARK-21587][SS] Added pushdown through watermarks.

## What changes were proposed in this pull request?

Filter predicates can be pushed through EventTimeWatermark if they're 
deterministic and do not reference the watermarked attribute.
Projects can be pushed through EventTimeWatermark if they include the 
watermarked attribute.

This is a copy of PR 18790, which I had to make so I could build the right 
commit history.

## How was this patch tested?

new unit tests

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18889.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18889


commit 9cc8da5dbc5be9b7b663e002097214e3c0720801
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-07-31T22:58:03Z

Added pushdown below watermarks.

commit 154d34820be73f7d20bf1119fb385940d0ce6455
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-01T16:47:32Z

Push Project, Limit, and Filter through watermark when appropriate.

commit 84575b60609a3efc9824eba96541011a99313a63
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-07T20:50:29Z

Remove pushdown limit through watermark.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.

2017-08-08 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18790
  
Created https://github.com/apache/spark/pull/18889 with everything 
cherrypicked into the right place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18790: Added pushdown through watermarks.

2017-07-31 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18790

Added pushdown through watermarks.

## What changes were proposed in this pull request?

Deterministic filter predicates can now be pushed through an 
EventTimeWatermark.

## How was this patch tested?
unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18790


commit 0a837ae6f0a79409c2ab8642a0537253f309754b
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-07-31T22:58:03Z

Added pushdown below watermarks.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18790: [SPARK-21587][SS] Added filter pushdown through watermar...

2017-08-09 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18790
  
done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.

2017-08-09 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18790
  
Agreed. I've restricted this PR to just filter, since the original story 
was about enabling partition pruning for filters above the watermark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-09 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18790#discussion_r132238309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] 
with PredicateHelper {
   pushDownPredicate(filter, u.child) { predicate =>
 u.withNewChildren(Seq(Filter(predicate, u.child)))
   }
+
+case filter @ Filter(condition, watermark: EventTimeWatermark) =>
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...

2017-08-07 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18790#discussion_r131761258
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] 
with PredicateHelper {
   pushDownPredicate(filter, u.child) { predicate =>
 u.withNewChildren(Seq(Filter(predicate, u.child)))
   }
+
+case filter @ Filter(condition, watermark: EventTimeWatermark) =>
--- End diff --

For filter, the logic has a subtle additional condition. We don't want to 
push down filters on the watermark attribute, because:
* they'll be at least as expensive to evaluate as the watermark operator
* partition pruning shouldn't apply since there won't be useful partitions 
on an event time

For project, I don't see a rule for UnaryNode anywhere. I might have missed 
it.

For limit, I actually removed the rule for EventTimeWatermark that I 
originally added, since it does drop rows in some situations.

So I don't think that making EventTimeWatermark subclass UnaryNode would 
avoid any of the code in this PR. I agree it seems appropriate, but it also 
seems orthogonal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...

2017-08-07 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18840#discussion_r131707863
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 checkDataset[Long](df, 1L to 100L: _*)
   }
 
+  test("SPARK-21565: watermark operator accepts attributes from 
replacement") {
+withTempDir { dir =>
+  dir.delete()
+
+  val df = Seq(("a", 100.0, new java.sql.Timestamp(100L)))
+.toDF("symbol", "price", "eventTime")
+  df.write.json(dir.getCanonicalPath)
+
+  val input = spark.readStream.schema(df.schema)
+.json(dir.getCanonicalPath)
+
+  val groupEvents = input
+.withWatermark("eventTime", "2 seconds")
+.groupBy("symbol", "eventTime")
+.agg(count("price") as 'count)
+.select("symbol", "eventTime", "count")
+  val q = groupEvents.writeStream
+.outputMode("append")
+.format("console")
+.start()
+  q.processAllAvailable()
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-17 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r133844671
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -564,10 +564,14 @@ class SparkSession private(
*/
   private[sql] def internalCreateDataFrame(
   catalystRows: RDD[InternalRow],
-  schema: StructType): DataFrame = {
+  schema: StructType,
+  isStreaming: Boolean = false): DataFrame = {
 // TODO: use MutableProjection when rowRDD is another DataFrame and 
the applied
 // schema differs from the existing schema on any field data type.
-val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
+val logicalPlan = LogicalRDD(
+  schema.toAttributes,
+  catalystRows,
+  isStreaming = isStreaming)(self)
--- End diff --

It's necessary here because there are two other default arguments in the 
constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-17 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r133844681
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 ---
@@ -75,7 +75,7 @@ class LogicalPlanSuite extends SparkFunSuite {
 val relation = LocalRelation(AttributeReference("a", IntegerType, 
nullable = true)())
 val incrementalRelation = new LocalRelation(
   Seq(AttributeReference("a", IntegerType, nullable = true)())) {
-  override def isStreaming(): Boolean = true
+  override val isStreaming: Boolean = true
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-17 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r133844686
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
 ---
@@ -43,7 +43,9 @@ object LocalRelation {
   }
 }
 
-case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = 
Nil)
+case class LocalRelation(output: Seq[Attribute],
--- End diff --

Done. (I think this is a correct summary?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...

2017-08-22 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/18925


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode

2017-08-22 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18925
  
The underlying JIRA ticket is won'tfixed because this model doesn't seem 
better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread joseph-torres
GitHub user joseph-torres reopened a pull request:

https://github.com/apache/spark/pull/19239

[SPARK-22017] Take minimum of all watermark execs in StreamExecution.

## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in 
StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19239


commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-13T21:49:23Z

Implement multiple watermark StreamExecution support.

commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T18:30:40Z

partially fix test

commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T21:52:16Z

Finish rewriting test

commit 484940e5eb4d1eac1c5ec81f475681c9241bbab2
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T22:24:36Z

make IncrementalExecution.offsetSeqMetadata non-private

commit 032f55503c8d424390da1ff85054e3a01e7489eb
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T23:22:22Z

properly name test dataframes

commit d7f5f60c6be5bf228c960c3549eb81ed869f0227
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T23:39:22Z

Combine test helper functions.

commit 2f07f90423d87985322975f8ad5aef8f70f28066
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-15T01:18:12Z

Key watermarks by relative position rather than attribute.

commit 8b605384d77fdeb63b28feabee74284a5ab1409a
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-15T02:05:14Z

Address test comments.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/19239


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19327

[WIP] Implement stream-stream outer joins.

## What changes were proposed in this pull request?

Allow one-sided outer joins between two streams when a watermark is defined.

## How was this patch tested?

new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark outerjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19327


commit 1dfb95d5b7dee86b45aa831278d3fa7a7dc1917f
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-22T20:36:50Z

Implement stream-stream outer joins.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...

2017-09-13 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19212
  
@zsxwing for review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19239
  
addressed comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139028613
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

The problem is that watermark recalculation happens at the beginning of 
each batch, and to sequence executions I have to call CheckData or 
CheckLastBatch. So that method ends up producing a test multiple times longer, 
since a single entry is:

AddData(realData)
CheckLastBatch
AddData(0)
CheckLastBatch
AssertOnQuery


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139029401
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,24 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
--- End diff --

Well, we're updating multiple watermarks in the map. We later update 
`offsetSeqMetadata` with the new minimum one, but that's not in this block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139029187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -130,6 +130,13 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
 batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map from watermarked attributes to their current watermark. The 
minimum watermark
+   * timestamp present here will be used as the overall query watermark in 
offsetSeqMetadata;
+   * the query watermark is what's logged and used to age out old state.
+   */
+  protected var attributeWatermarkMsMap: AttributeMap[Long] = 
AttributeMap(Seq())
--- End diff --

This map has to persist and get updated across batches, and I'm not sure 
how to do that with a local variable or a val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19239

[SPARK-22017] Take minimum of all watermark execs in StreamExecution.

## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in 
StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19239


commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-13T21:49:23Z

Implement multiple watermark StreamExecution support.

commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T18:30:40Z

partially fix test

commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-14T21:52:16Z

Finish rewriting test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-06 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19452

[SPARK-22136][SS] Evaluate one-sided conditions early in stream-stream 
joins.

## What changes were proposed in this pull request?

Evaluate one-sided conditions early in stream-stream joins.

This is in addition to normal filter pushdown, because integrating it with 
the join logic allows it to take place in outer join scenarios. This means that 
rows which can never satisfy the join condition won't clog up the state. 

## How was this patch tested?
new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22136

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19452.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19452


commit c90fb3cb1e112edeacb4be2604a7b628f55697f4
Author: Jose Torres <j...@databricks.com>
Date:   2017-10-06T20:44:20Z

Evaluate one-sided conditions early in stream-stream joins.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19452#discussion_r144396781
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 ---
@@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
 }
   }
 
+  /**
+   * Wrapper around various useful splits of the join condition.
+   * left AND right AND joined is equivalent to full.
+   *
+   * Note that left and right do not necessarily contain *all* conjuncts 
which satisfy
+   * their condition. Any conjuncts after the first nondeterministic one 
are treated as
+   * nondeterministic for purposes of the split.
+   *
+   * @param left Deterministic conjuncts which reference only the left 
side of the join.
+   * @param right Deterministic conjuncts which reference only the right 
side of the join.
+   * @param joined Conjuncts which are in neither left nor right.
+   * @param full The full join condition.
+   */
+  case class JoinConditionSplitPredicates(
+left: Option[Expression],
+right: Option[Expression],
+joined: Option[Expression],
+full: Option[Expression]) {}
+
+  object JoinConditionSplitPredicates extends PredicateHelper {
+def apply(condition: Option[Expression], left: SparkPlan, right: 
SparkPlan):
+JoinConditionSplitPredicates = {
+  // Split the condition into 3 parts:
+  // * Conjuncts that can be applied to the left before storing.
+  // * Conjuncts that can be applied to the right before storing.
+  // * Conjuncts that must be applied to the full row at join time.
+  //
+  // Note that the third category includes both conjuncts that 
reference both sides
+  // and all nondeterministic conjuncts. Nondeterministic conjuncts 
can't be shortcutted
+  // to preserve any stateful semantics they may have.
+  val (leftCondition, rightCondition, joinedCondition) = {
+if (condition.isEmpty) {
+  (None, None, None)
+} else {
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition.get).span(_.deterministic)
--- End diff --

It's in a bunch of places in PushDownPredicate, but the reason for it isn't 
documented in any of those places, so I'm not sure where the right place to 
point is. I'm adding some documentation here describing why.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19452#discussion_r10608
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
 val updateStartTimeNs = System.nanoTime
 val joinedRow = new JoinedRow
 
+// Filter the joined rows based on the given condition.
+val leftPreJoinFilter =
+  newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), 
output).eval _
--- End diff --

Moving it in would require either also passing in the left ++ right input 
attributes, or passing preJoin and postJoin filters differently. I'm not sure 
which option is cleaner, so I can make the change you think is best.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19452#discussion_r10844
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -161,6 +164,10 @@ case class StreamingSymmetricHashJoinExec(
 new SerializableConfiguration(SessionState.newHadoopConf(
   sparkContext.hadoopConfiguration, sqlContext.conf)))
 
+
+  val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+  val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
--- End diff --

The problem is that the left joiner has left input attributes, but needs 
the right null row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-13 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19452#discussion_r144620005
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, 
SparkPlan}
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
+import org.apache.spark.sql.types.DataTypes
+
+class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
+  import org.apache.spark.sql.functions._
+
+  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
+  val attributeB = AttributeReference("b", DataTypes.IntegerType)()
+  val attributeC = AttributeReference("c", DataTypes.IntegerType)()
+  val attributeD = AttributeReference("d", DataTypes.IntegerType)()
+  val colA = new Column(attributeA)
+  val colB = new Column(attributeB)
+  val colC = new Column(attributeC)
+  val colD = new Column(attributeD)
+
+  val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq())
+  val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq())
+
+  test("empty") {
+val split = JoinConditionSplitPredicates(None, left, right)
+assert(split.leftSideOnly.isEmpty)
+assert(split.rightSideOnly.isEmpty)
+assert(split.bothSides.isEmpty)
+assert(split.full.isEmpty)
+  }
+
+  test("only literals") {
+// Literal-only conjuncts end up on the left side because that's the 
first bucket they fit in.
+// There's no semantic reason they couldn't be in any bucket.
+val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === 
lit(-1)).expr
+val split = JoinConditionSplitPredicates(Some(predicate), left, right)
+
+assert(split.leftSideOnly.contains(predicate))
+assert(split.rightSideOnly.isEmpty)
--- End diff --

I don't want to get into duplicating predicates here for the sake of 
symmetry. I could move literals to the post-join part maybe?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
 } else {
   // Strip out timestamp
-  rawBatch.select("_1").toDF("value")
+  rawBatch.select("value").toDF()
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851225
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 
   protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
 
+  override protected def checkInvariants(
+  result: LogicalPlan,
+  original: LogicalPlan,
+  rule: Rule[LogicalPlan]): Unit = {
+assert(
+  result.isStreaming == original.isStreaming,
+  s"Rule ${rule.ruleName} changed isStreaming from original 
${original.isStreaming}:" +
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-09-11 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r138114144
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -443,7 +444,8 @@ case class Range(
 end: Long,
 step: Long,
 numSlices: Option[Int],
-output: Seq[Attribute])
+output: Seq[Attribute],
+override val isStreaming: Boolean)
--- End diff --

I don't think there's necessarily a reason it shouldn't be able to; 
streaming sources are free to define getBatch() however they'd like.

Right now the only source actually doing that is a fake source in 
StreamSuite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-05 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r137143373
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-val rdd = sqlContext.sparkContext.parallelize(rawList).map(
-v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rdd = sqlContext.sparkContext.
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-05 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r137048000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 
 val rdd = sqlContext.sparkContext.parallelize(rawList).map(
 v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19212: [SPARK-21988] Add default stats to StreamingExecu...

2017-09-12 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19212

[SPARK-21988] Add default stats to StreamingExecutionRelation.

## What changes were proposed in this pull request?

Add default stats to StreamingExecutionRelation.

## How was this patch tested?

existing unit tests and an explain() test to be sure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19212


commit 80d02681ae5290fefe991a7faef7273d79f5f1dd
Author: Jose Torres <j...@databricks.com>
Date:   2017-09-12T21:44:51Z

Add default stats to StreamingExecutionRelation.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140933095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec(
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  override def output: Seq[Attribute] = joinType match {
+case _: InnerLike => left.output ++ right.output
+case LeftOuter => left.output ++ 
right.output.map(_.withNullability(true))
+case RightOuter => left.output.map(_.withNullability(true)) ++ 
right.output
+case _ =>
+  throwBadJoinTypeException()
+  Seq()
+  }
 
   override def outputPartitioning: Partitioning = joinType match {
 case _: InnerLike =>
   PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
+case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning))
--- End diff --

That's what I thought at first, but the non-streaming HashJoin seems to do 
the partitioning this way. (Or am I misunderstanding what buildSide means in 
that trait?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140935586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
--- End diff --

I don't think it's correct to filter the output from remove. The query

Seq(1, 2, 3).toDF("val1").join(Seq[Int]().toDF("val2"), 'val1 === 'val2 && 
'val1 === 0, "left_outer")

produces ((1, null), (2, null), (3, null)).


Outer joins with watermark range conditions also wouldn't work if we 
filtered remove output, since the range condition would exclude null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140936872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
--- End diff --

Yeah sorry, I meant to go back and fully address this comment the 

[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-09-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r141984167
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends 
SparkFunSuite {
 batchStreamSupported = false,
 streamBatchSupported = false)
 
-  // Left outer joins: *-stream not allowed
+  // Left outer joins: *-stream not allowed with default condition
--- End diff --

The condition that testBinaryOperationInStreamingPlan uses if left 
unspecified. I've removed the reference since it wasn't there originally, but 
let me know if you think some additional comment belongs here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142442657
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -425,6 +426,10 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 
 // Test static comparisons
 assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(1))
+
+// Test non-positive results
+assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) 
- 10") === Some(0))
+assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) 
- 100") === Some(-9))
--- End diff --

As mentioned earlier, I'm not sure how to move them; this test method as 
written relies on building a physical plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19327
  
I believe I've addressed all comments. Some refactorings made some comments 
obsolete, though.

I've also fixed 1 bug and 1 test issue causing the 2 unit test failures.

There's still the outstanding question of whether allRemovalsTimeMs 
can/should include time spent outer joining, in addition to test review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140968754
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+key

[GitHub] spark issue #18973: [SPARK-21765] Set isStreaming on leaf nodes for streamin...

2017-08-21 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/18973
  
Addressed comments from @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-21 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r134367543
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -420,8 +420,10 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
* converted to Catalyst rows.
*/
   private[sql]
-  def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: 
StructType) = {
-sparkSession.internalCreateDataFrame(catalystRows, schema)
+  def internalCreateDataFrame(catalystRows: RDD[InternalRow],
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-21 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r134367548
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -728,7 +729,16 @@ class FakeDefaultSource extends FakeSource {
 
   override def getBatch(start: Option[Offset], end: Offset): DataFrame 
= {
 val startOffset = 
start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1
-spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 
1).toDF("a")
+val ds = new Dataset[java.lang.Long](
--- End diff --

I've tried addressing this a few different ways, and I can't come up with 
anything cleaner than the current solution. Directly creating a DF doesn't set 
the isStreaming bit, and a bunch of copying and casting is required to get it 
set; using LocalRelation requires explicitly handling the encoding of the rows, 
since LocalRelation requires InternalRow input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-21 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18973#discussion_r134367553
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -118,8 +122,15 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   batches.slice(sliceStart, sliceEnd)
 }
 
-logDebug(
-  s"MemoryBatch [$startOrdinal, $endOrdinal]: 
${newBlocks.flatMap(_.collect()).mkString(", ")}")
+logDebug({
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19056: [SPARK-21765] Check that optimization doesn't affect isS...

2017-08-25 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19056
  
Addressed all comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19056

[SPARK-21765] Check that optimization doesn't affect isStreaming bit.

## What changes were proposed in this pull request?

Add an assert in logical plan optimization that the isStreaming bit stays 
the same, and fix empty relation rules where that wasn't happening.

## How was this patch tested?

new and existing unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21765-followup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19056


commit b83349567760dd0d33388d3fc68d8db1b648e1f1
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-25T20:48:49Z

Check that optimization doesn't affect isStreaming bit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...

2017-08-17 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18973

[SPARK-21765] Set isStreaming on leaf nodes for streaming plans.

## What changes were proposed in this pull request?
All logically streaming plans will now have is. This involved adding 
isStreaming as a case class arg in a few cases, since a node might be logically 
streaming depending on where it came from.

## How was this patch tested?

Existing unit tests - no functional change is intended in this PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21765

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18973.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18973


commit 5115180f7269dab114b19891ae3eacecaed153bf
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-15T03:49:38Z

partial - Add isStreaming bit to all LogicalPlan leaves.

commit bf61247d2baaa8d32c66df9060c1b88e79eaa824
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-08-17T00:35:56Z

Fixed streaming tests

commit 86a3de9a4d8b8414606d13bb34fd758428b83838
Author: Jose Torres <joseph-tor...@databricks.com>
Date:   2017-08-17T17:52:52Z

remove spurious commenting out




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19461: [SPARK-22230] Swap per-row order in state store r...

2017-10-09 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19461

[SPARK-22230] Swap per-row order in state store restore.

## What changes were proposed in this pull request?
In state store restore, for each row, put the saved state before the row in 
the iterator instead of after.

This fixes an issue where agg(last('attr)) will forever return the last 
value of 'attr from the first microbatch.

## How was this patch tested?

new unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22230

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19461


commit 17ef8a843e7dec8da0625caeda213cb1f5c64a4a
Author: Jose Torres <j...@databricks.com>
Date:   2017-10-09T20:55:19Z

Swap per-row order in state store restore.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

2017-10-11 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19452#discussion_r144148695
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 ---
@@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
 }
   }
 
+  /**
+   * Wrapper around various useful splits of the join condition.
+   * left AND right AND joined is equivalent to full.
+   *
+   * Note that left and right do not necessarily contain *all* conjuncts 
which satisfy
+   * their condition. Any conjuncts after the first nondeterministic one 
are treated as
+   * nondeterministic for purposes of the split.
+   *
+   * @param left Deterministic conjuncts which reference only the left 
side of the join.
+   * @param right Deterministic conjuncts which reference only the right 
side of the join.
+   * @param joined Conjuncts which are in neither left nor right.
+   * @param full The full join condition.
+   */
+  case class JoinConditionSplitPredicates(
+left: Option[Expression],
+right: Option[Expression],
+joined: Option[Expression],
+full: Option[Expression]) {}
+
+  object JoinConditionSplitPredicates extends PredicateHelper {
+def apply(condition: Option[Expression], left: SparkPlan, right: 
SparkPlan):
+JoinConditionSplitPredicates = {
+  // Split the condition into 3 parts:
+  // * Conjuncts that can be applied to the left before storing.
+  // * Conjuncts that can be applied to the right before storing.
+  // * Conjuncts that must be applied to the full row at join time.
+  //
+  // Note that the third category includes both conjuncts that 
reference both sides
+  // and all nondeterministic conjuncts. Nondeterministic conjuncts 
can't be shortcutted
+  // to preserve any stateful semantics they may have.
+  val (leftCondition, rightCondition, joinedCondition) = {
+if (condition.isEmpty) {
+  (None, None, None)
+} else {
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition.get).span(_.deterministic)
--- End diff --

Nondeterministic conjuncts don't commute across && because Spark does 
shortcut evaluation. (That is, "udf('val) == 0 && false" will cause udf to be 
evaluated, while "false && udf('val) == 0" will not.) This behavior is copied 
from how predicate pushdown handles nondeterminism.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19465: [SPARK-21988][SS]Implement StreamingRelation.computeStat...

2017-10-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19465
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156471355
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -783,29 +430,29 @@ class StreamExecution(
 }
 
 while (notDone) {
-  awaitBatchLock.lock()
+  awaitProgressLock.lock()
   try {
-awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
 if (streamDeathCause != null) {
   throw streamDeathCause
 }
   } finally {
-awaitBatchLock.unlock()
+awaitProgressLock.unlock()
   }
 }
 logDebug(s"Unblocked at $newOffset for $source")
   }
 
   /** A flag to indicate that a batch has completed with no new data 
available. */
-  @volatile private var noNewData = false
+  @volatile protected var noNewData = false
--- End diff --

Yes. The flag is really just a test harness; it's only used in 
processAllAvailable, so tests can block until there's a batch (or now epoch) 
that doesn't contain any data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19926: [SPARK-22733] Split StreamExecution into MicroBatchExecu...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19926
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19925
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156532669
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

My current thinking is to have it be a new trigger type. It can't really be 
a config, because continuous processing (at least in the initial 
implementation) won't support all operators.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156532817
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

Sure, we could do that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156539696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156540689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156542366
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
  

[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r157047178
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.util.{Clock, Utils}
+
+class MicroBatchExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: Sink,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  private val triggerExecutor = trigger match {
+case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+case OneTimeTrigger => OneTimeExecutor()
+case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  }
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in QueryExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
+val _logicalPlan = analyzedPlan.transform {
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = 
s"$resolvedCheckpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)(sparkSession)
+})
+}
+sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
+uniqueSources = sources.distinct
+_logicalPlan
+  }
+
+  /**
+   * Repeatedly attempts to run batches as data arrives.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
+triggerExecutor.execute(() => {
+  startTrigger()
+
+  if (isActive) {
+reportTimeTaken("triggerExecution") {
+  if (currentBatchId < 0) {
+// We'll do this initialization only once
+populateStartOffsets(sparkSessionForStream)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+  } else {
+constructNextBatch()
+  }
+  if (dataAvailable) {
+currentStatus = currentStatus.copy(isDataAvailable = true)
+updateStatusMessage("Processing new

[GitHub] spark pull request #20012: [SPARK-22824] Restore old offset for binary compa...

2017-12-18 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/20012

[SPARK-22824] Restore old offset for binary compatibility

## What changes were proposed in this pull request?

Some users depend on source compatibility with the 
org.apache.spark.sql.execution.streaming.Offset class. Although this is not a 
stable interface, we can keep it in place for now to simplify upgrades to 2.3.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark binary-compat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20012


commit 7d1df9855dee8b6bce5ebed136b8da4275f178c7
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-18T21:34:50Z

Restore old offset for binary compatibility




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-14 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19984

[SPARK-22789] Map-only continuous processing execution

## What changes were proposed in this pull request?

Basic continuous execution, supporting map/flatMap/filter, with commits and 
advancement through RPC.

## How was this patch tested?

new unit-ish tests (exercising execution end to end)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-impl

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19984


commit d6bea84447d910e79d5926972d87a80bc5dc2e2e
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-07T22:08:28Z

Refactor StreamExecution into a parent class so continuous processing can 
extend it

commit df6b8861173d1e7853952c8f3ffe504975efe204
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T19:31:28Z

address fmt

commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-13T00:09:48Z

slight changes

commit 2b360ab49bcab3c73ea85ce62202e40e950931ef
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-13T00:10:34Z

rm spurious space

commit 1b19f1cef17e7324997649ad8c5f97887912
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-13T00:35:30Z

fix compile

commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T20:48:20Z

harness

commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T21:18:25Z

awaitEpoch impl

commit 578bbb7eb0725b795ac65d1beda436515f4f4eba
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T21:46:09Z

move local[10] to only continuous suite

commit 9051eff6c88838ac61ab45763ed84d593e2d4837
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T21:49:55Z

repeatedly restart

commit 60fa4477591cc264b9ea253f64065d762ce3f96f
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:02:52Z

fix some simple TODOs

commit ea8e76ec75752d134433730ee1a007cce1fdcfe8
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:11:18Z

use runId instead of queryId for endpoint name

commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:19:03Z

more simple todos

commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:27:12Z

remove old state

commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:35:51Z

remove clean shutdown workaround in StreamTest

commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:50:09Z

update ContinuousExecution docs

commit f687432a58acf7337885edfc01adc94188d174d8
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-11T22:59:14Z

add comments to EpochCoordinator

commit 987b011ee78292c3379559910ebe101daf4f9450
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T00:02:54Z

change offset semantic to end of previous epoch

commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T00:18:40Z

document EpochCoordinator

commit d6ef404b85fa6977b5f38a853dca11de5189b3f9
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T02:06:44Z

simplify epoch handling

commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T19:17:58Z

stress tests

commit 053a9f349a4829433a495aa5989f1ca1c8a3256e
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T20:17:22Z

add minBatchesToRetain

commit 7072d21444388fe167fa7e3475b3e95ec9923d5e
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T20:43:33Z

add confs

commit 4083a8f5c6b6ef298726234d54f23a90e971e77e
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-12T21:10:33Z

latency suite not meaningful here

commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-13T00:04:07Z

more stress::q

commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-13T18:55:23Z

use temp dir

commit e4a1bc19db9ea0233879d270e725ed58d95a34ad
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-14T19:37:36Z

fix against rebase

commit 8887b3c92afe8bb1659f600785af5d97f085f2bb
Author: Jose Torres <j...@

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-15 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r157278184
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+buildConf("spark.sql.streaming.continuous.executorQueueSize")
+.internal()
+.doc("The size (measured in number of rows) of the queue used in 
continuous execution to" +
+  " buffer the results of a ContinuousDataReader.")
+.intConf
--- End diff --

Should it be? I can't imagine anything close to MAX_INT being a reasonable 
value here. Will it be hard to migrate to a long if we later discover it's 
needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156741131
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.collection.JavaConverters._
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+
+object ContinuousRateStreamSource {
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+}
+
+case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, 
Long]) extends Offset {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+  override val json = Serialization.write(partitionToStartValue)
+}
+
+case class ContinuousRateStreamPartitionOffset(partition: Int, start: 
Long) extends PartitionOffset
+
+class ContinuousRateStreamReader(options: DataSourceV2Options)
+  extends ContinuousReader {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  val numPartitions = 
options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt
+  val rowsPerSecond = 
options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val tuples = offsets.map {
+  case ContinuousRateStreamPartitionOffset(p, s) => p -> s
+}
+ContinuousRateStreamOffset(Map(tuples: _*))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json))
+  }
+
+  override def readSchema(): StructType = {
+StructType(
+StructField("timestamp", TimestampType, false) ::
+StructField("value", LongType, false) :: Nil)
+  }
+
+  private var offset: java.util.Optional[Offset] = _
+
+  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+this.offset = offset
+  }
+
+  override def getStartOffset(): Offset = offset.get()
+
+  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+val partitionStartMap = Option(offset.orElse(null)).map {
+  case o: ContinuousRateStreamOffset => o.partitionToStartValue
+  case s: SerializedOffset => Serialization.read[Map[Int, 
Long]](s.json)
+  case _ => throw new IllegalArgumentException("invalid offset type 
for ContinuousRateSource")
+}
+if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
+  throw new IllegalArgumentException("Start offset contained too many 
partitions.")
+}
+val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
+
+Range(0, numPartitions).map { n =>
+  // If the offset doesn't have a value for this partition, start from 
the beginning.
+  val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n)
+  // Have each partition advance by numPartitions each row, with 
starting points staggered
+  // by their partition index.
+  RateStreamReadTask(start, n, numPartitions, perPartitionRate)
+.asInstanceOf[ReadTask[Row]]
+}.asJava
+  }
+
  

[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-13 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r156740666
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala 
---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import 
org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, 
Update}
+import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, 
DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durability.
+ */
+class MemorySinkV2 extends DataSourceV2
+  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
+
+  override def createMicroBatchWriter(
+  queryId: String,
+  batchId: Long,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): 
java.util.Optional[DataSourceV2Writer] = {
+java.util.Optional.of(new MemoryWriter(this, batchId, mode))
+  }
+
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): java.util.Optional[ContinuousWriter] 
= {
+java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+  }
+
+  private case class AddedData(batchId: Long, data: Array[Row])
+
+  /** An order list of batches that have been written to this [[Sink]]. */
+  @GuardedBy("this")
+  private val batches = new ArrayBuffer[AddedData]()
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+batches.flatMap(_.data)
+  }
+
+  def latestBatchId: Option[Long] = synchronized {
+batches.lastOption.map(_.batchId)
+  }
+
+  def latestBatchData: Seq[Row] = synchronized {
+batches.lastOption.toSeq.flatten(_.data)
+  }
+
+  def toDebugString: String = synchronized {
+batches.map { case AddedData(batchId, data) =>
+  val dataStr = try data.mkString(" ") catch {
+case NonFatal(e) => "[Error converting to string]"
+  }
+  s"$batchId: $dataStr"
+}.mkString("\n")
+  }
+
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): 
Unit = {
+val notCommitted = synchronized {
+  latestBatchId.isEmpty || batchId > latestBatchId.get
+}
+if (notCommitted) {
+  logDebug(s"Committing batch $batchId to $this")
+  outputMode match {
+case Append | Update =>
+  val rows = AddedData(batchId, newRows)
+  synchronized { batches += rows }
+
+case Complete =>
+  val rows = AddedData(batchId, newRows)
+  synchronized {
+batches.clear()
+batches += rows
+  }
+
+case _ =>
+  throw new IllegalArgumentException(
+s"Output mode $outputMode is not supported by MemorySink")
+  }
+} else {
+  logDebug(s"Skipping already committed batch: $batchId")
+}
+  }
+
+  def clear(): Unit = synchronized {
+batches.clear()
+  }
+
+  override def toString(): String = "MemorySink"
+}
+
+case cl

[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156471697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

Sorry, I'm not sure what you have in mind here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156470973
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

The offset log right now has a strict schema that commit information 
wouldn't fit in. I was planning to keep both logs in the continuous 
implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156468624
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
--- End diff --

We may want to tweak the variable name, but continuous processing will 
still need to know how long it should retain commit and offset log entries. 
Unfortunately we're stuck with the config name, and I don't think it makes 
sense to introduce a second parallel one doing the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-07 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19925

[SPARK-22732] Add Structured Streaming APIs to DataSourceV2

## What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, 
including new pieces needed to support continuous processing [SPARK-20928]. 
High level summary:

- DataSourceV2 includes new mixins to support micro-batch and continuous 
reads and writes. For reads, we accept an optional user specified schema rather 
than using the ReadSupportWithSchema model, because doing so would severely 
complicate the interface.

- DataSourceV2Reader includes new interfaces to read a specific microbatch 
or read continuously from a given offset. These follow the same setter pattern 
as the existing Supports* mixins so that they can work with 
SupportsScanUnsafeRow.

- DataReader (the per-partition reader) has a new subinterface 
ContinuousDataReader only for continuous processing. This reader has a special 
method to check progress, and next() blocks for new input rather than returning 
false.

- Offset, an abstract representation of position in a streaming query, is 
ported to the public API. (Each type of reader will define its own Offset 
implementation.)

- DataSourceV2Writer has a new subinterface ContinuousWriter only for 
continuous processing. Commits to this interface come tagged with an epoch 
number, as the execution engine will continue to produce new epoch commits as 
the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 
batch API, or deprecate the existing streaming source/sink internal APIs in 
spark.sql.execution.streaming.

## How was this patch tested?

Toy implementations of the new interfaces with unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19925


commit daa3a78ad4dd7ecfc73f5b1dd050388c07b42771
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T18:48:20Z

add tests

commit edae89508ec2bf02fba00a264cb774b0d60fb068
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T19:35:36Z

writer impl

commit 9b28c524b343018d20d2d8d3c9ed4d3c530c413f
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T19:37:24Z

rm useless writer

commit 7ceda9d63b9914cfd275fc4240fa9c696afa05d1
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T21:02:32Z

rm weird docs

commit ff7be6914560968af7f2179c3704446c771fad52
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T21:59:50Z

shuffle around public interfaces

commit 4ae516a61af903c37b748a3941c2472d20776ce4
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T22:02:01Z

fix imports

commit a8ff2ee9eeb992f6c0806cb2b4f33b976ef51cf5
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T22:40:15Z

put deserialize in reader so we don't have to port SerializedOffset

commit 5096d3d551aa4479bfb112b286683e28ec578f3c
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T23:51:08Z

off by one errors grr

commit da00f6b5ddac8bd6025076a67fd4716d9d070bf7
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-05T23:55:58Z

document right semantics

commit 1526f433837de78f59009b6632b6920de38bb1b0
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T00:08:54Z

document checkpoint location

commit 33b619ca4f9aa1a82e3830c6e485b8298ca9ff50
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T00:43:36Z

add getStart to continuous and clarify semantics

commit 083b04004f58358b3f6e4c82b4690ca5cf2da764
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T17:23:34Z

cleanup offset set/get docs

commit 4d6244d2ae431f6043de97f322ce1c33090c
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T17:32:45Z

cleanup reader docs

commit 5f9df4f1b54cbd0570d0df5567c42ac2575009a5
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T18:06:44Z

explain getOffset

commit a2323e95ff2d407877ded07b7537bac5b63dda8f
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T21:17:43Z

fix fmt

commit b80c75cd698cbe4840445efb78a662f02f355a99
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T21:24:35Z

fix doc

commit 03bd69da4b0450e5fec88f4196998e3075e98edc
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-06T21:39:20Z

note interfaces are temporary

commit c7bc6a37914312666259bb9724aa710392

[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-07 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19926

[SPARK-22733] Split StreamExecution into MicroBatchExecution and 
StreamExecution.

## What changes were proposed in this pull request?

StreamExecution is now an abstract base class, which MicroBatchExecution 
(the current StreamExecution) inherits. When continuous processing is 
implemented, we'll have a new ContinuousExecution implementation of 
StreamExecution.

A few fields are also renamed to make them less microbatch-specific.

## How was this patch tested?

refactoring only


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19926


commit 22d93b7d6133bffb271e6db300b936ae4dda74ab
Author: Jose Torres <j...@databricks.com>
Date:   2017-12-07T22:08:28Z

Refactor StreamExecution into a parent class so continuous processing can 
extend it




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...

2017-12-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19925
  
/cc @marmbrus @cloud-fan @rxin @brkyvz @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19926: [SPARK-22733] Split StreamExecution into MicroBatchExecu...

2017-12-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19926
  
/cc @brkyvz @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...

2017-12-11 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19925
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-08 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r155867163
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+   *can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *incrementing counter representing a consistent set of 
data; the same batch may
+   *be started multiple times in failure recovery 
scenarios, but it will always
+   *contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch 
output means to this
+   * source, please refer to {@link OutputMode} for more 
details.
--- End diff --

Good point. Fixed here and in ContinuousWriteSupport.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19611

[SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively

## What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents 
stack overflow if too many deltas stack up in a low memory environment.

## How was this patch tested?

existing unit tests for functional equivalence, new unit test to check for 
stack overflow

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22305

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19611


commit 6366347516853abd7afd7e89452e656b1011cf6e
Author: Jose Torres <j...@databricks.com>
Date:   2017-10-30T15:48:13Z

rewrite loadMap iteratively

commit 33ea2fb59f5ad47ed4713ca73945a9630486677c
Author: Jose Torres <j...@databricks.com>
Date:   2017-10-30T16:28:28Z

add test exercising stack overflow




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap...

2017-10-30 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19611
  
One issue I want to explicitly bring up: this new unit test takes very 
long, almost 2 minutes on my computer. Creating 10k files isn't going to be 
super fast no matter what we do, but is there something that can mitigate the 
problem? Maybe it'd be better in a different suite or something; I don't know 
what the typical practice is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19581: [SPARK-22366] Support ignoring missing files

2017-10-26 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19581

[SPARK-22366] Support ignoring missing files

## What changes were proposed in this pull request?

Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing 
flag "spark.sql.files.ignoreCorruptFiles".

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19581.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19581


commit ca059b09a561ee8295b282b7b577514fef101ef3
Author: Jose Torres <j...@databricks.com>
Date:   2017-10-26T20:45:57Z

Support ignoreMissingFiles




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158156114
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+case _ => throw new IllegalStateException(s"Unsupported type of 
trigger: $trigger")
+  }
+
+  override protected def runActivatedStream(sparkSessionForStream: 
SparkSession): Unit = {
+do {
+  try {
+runContinuous(sparkSessionForStream)
+  } catch {
+case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --

The sequencing is:

- The pre-existing method stopSources() marks the ContinuousReader objects 
as stopped and cleans up any resources they may be holding. This doesn't affect 
query execution, and stopSources already swallows any non-fatal exception 
thrown by a stop() implementation.
-

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158159270
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: ContinuousWriteSupport,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+extraOptions: Map[String, String],
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  @volatile protected var continuousSources: Seq[ContinuousReader] = 
Seq.empty
+  override protected def sources: Seq[BaseStreamingSource] = 
continuousSources
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in StreamExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelationV2, 
ContinuousExecutionRelation]()
+analyzedPlan.transform {
+  case r @ StreamingRelationV2(
+  source: ContinuousReadSupport, _, extraReaderOptions, output, _) 
=>
+toExecutionRelationMap.getOrElseUpdate(r, {
+  ContinuousExecutionRelation(source, extraReaderOptions, 
output)(sparkSession)
+})
+  case StreamingRelationV2(_, sourceName, _, _, _) =>
+throw new AnalysisException(
+  s"Data source $sourceName does not support continuous 
processing.")
+}
+  }
+
+  private val triggerExecutor = trigger match {
+case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
--- End diff --

I think the coupling is correct here. ProcessingTime represents the rate of 
progress through the query's fenceposts, which applies here as well as it does 
in the microbatch case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19984#discussion_r158158855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  override protected def getPartitions: Array[Partition] = {
+readTasks.asScala.zipWithIndex.map {
+  case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+// This queue contains two types of messages:
+// * (null, null) representing an epoch boundary.
+// * (row, off) containing a data row and its corresponding 
PartitionOffset.
+val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+val epochPollFailed = new AtomicBoolean(false)
+val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  s"epoch-poll--${runId}--${context.partitionId()}")
+val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+epochPollExecutor.scheduleWithFixedDelay(
+  epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+// Important sequencing - we must get start offset before the data 
reader thread begins
+val startOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+val dataReaderFailed = new AtomicBoolean(false)
+val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+dataReaderThread.setDaemon(true)
+dataReaderThread.start()
+
+context.addTaskCompletionListener(_ => {
+  reader.close()
+  dataReaderThread.interrupt()
+  epochPollExecutor.shutdown()
+})
+
+val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private var currentRow: UnsafeRow = _
+  private var currentOffset: PartitionOffset = startOffset
+  private var currentEpoch =
+
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def hasNext(): Boolean = {
+if (dataReaderFailed.get()) {
+  throw new SparkException("data read failed", 
dataReaderThread.failureReason)
+}
+if 

[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19984
  
The result says it fails Spark unit tests, but clicking through shows a 
count of 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

2017-12-20 Thread joseph-torres
Github user joseph-torres commented on the issue:

https://github.com/apache/spark/pull/19984
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org