[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239563967
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

"Exist" is a relative concept, I suppose. I think we need to somehow allow 
for create-on-write functionality, even if many table providers won't want to 
support it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...

2018-11-29 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20906


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20752: [SPARK-23559][SS] Create StreamingDataWriterFacto...

2018-11-29 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20752


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-11-29 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20859


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23095: [SPARK-23886][SS] Update query status for Continu...

2018-11-21 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/23095#discussion_r235473858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -117,6 +117,7 @@ class ContinuousExecution(
 // For at least once, we can just ignore those reports and risk 
duplicates.
 commitLog.getLatest() match {
   case Some((latestEpochId, _)) =>
+updateStatusMessage(s"Getting offsets from latest epoch 
$latestEpochId")
--- End diff --

nit: I'd mention that we're restarting the streaming query from an existing 
epoch in this message, to contrast with "starting new query" in the other branch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-21 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r235470285
  
--- Diff: project/MimaExcludes.scala ---
@@ -149,7 +149,8 @@ object MimaExcludes {
 
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"),
 
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"),
 
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter")
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"),
--- End diff --

This list of exclusions is getting kinda silly. Is there some way to just 
completely exclude this package from compatibility checks until we've 
stabilized it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...

2018-11-13 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/23023
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-19 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/22547
  
I agree that we need a shared understanding of the relationship between 
this work and the new catalog API. I was not under the impression that the 
primary purpose of v2 is to integrate catalog tables.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22671: [SPARK-25615][SQL][TEST] Improve the test runtime...

2018-10-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22671#discussion_r223775935
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ---
@@ -332,7 +332,9 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext with KafkaTest {
 var ex: Exception = null
 try {
   ex = intercept[StreamingQueryException] {
-writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
+writer = createKafkaWriter(input.toDF(),
--- End diff --

In retrospect 1000 may have been too low - that could cause flakiness if 
the test runner is slow or something and it legitimately takes a second or two 
for Kafka to respond. 10k definitely should work, and I'd be fine with 1k if 
we're really worried about test time here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22478: [SPARK-25472][SS] Don't have legitimate stops of streams...

2018-09-19 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/22478
  
Lgtm pending tests

On Wed, Sep 19, 2018 at 5:16 PM Shixiong Zhu 
wrote:

> LGTM pending tests. Could you add [SS] to your title?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22478#issuecomment-422998495>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AdL70zfdGIhkC4wnrqitXNknF_weFv5Yks5uct53gaJpZM4WxJvK>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22388: Revert [SPARK-24882][SQL] improve data source v2 API fro...

2018-09-12 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/22388
  
MicroBatchExecution.scala and ContinuousExecution.scala look right after 
the revert, although it would be helpful to understand what the diff is between 
this and a straight `git revert`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

2018-09-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22386#discussion_r216728848
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -391,6 +393,7 @@ class ContinuousExecution(
 }
 
 object ContinuousExecution {
+  val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
--- End diff --

nit: I think this belongs in StreamExecution, since both 
ContinuousExecution and MicroBatchExecution set it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization...

2018-08-27 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/22245

[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.

## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config 
that was persisted rather than reconstructing it to identify the stream's 
current configuration.

We caught most instances of this in the original PR, but this one slipped 
through.

## How was this patch tested?

n/a

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark fixflake

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22245


commit 93c7bd93f5dbec41a0fd4d6b5ef0bfe0bfdc235c
Author: Jose Torres 
Date:   2018-08-27T17:03:17Z

fix flake




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22191: [SPARK-25204][SS] Fix race in rate source test.

2018-08-22 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/22191

[SPARK-25204][SS] Fix race in rate source test.

## What changes were proposed in this pull request?

Fix a race in the rate source tests. We need a better way of testing 
restart behavior.

## How was this patch tested?

unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark racetest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22191


commit eec0ad08e390831717203f6d002e3b1218de6d36
Author: Jose Torres 
Date:   2018-08-22T22:10:32Z

fix test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-21 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r211739221
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 ---
@@ -47,7 +47,9 @@ trait KafkaContinuousTest extends KafkaSourceTest {
 eventually(timeout(streamingTimeout)) {
   assert(
 query.lastExecution.logical.collectFirst {
-  case StreamingDataSourceV2Relation(_, _, _, r: 
KafkaContinuousReader) => r
+  case r: StreamingDataSourceV2Relation
+  if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
--- End diff --

I think this logic is subtly incorrect (and what's causing the flakiness in 
the continuous test). It needs to get the actual scan config being used from 
DataSourceV2ScanExec in the physical plan; r.scanConfigBuilder.build() will 
always produce the most up-to-date `knownPartitions` value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-21 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r211639298
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -24,16 +24,17 @@
 import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
 
 /**
- * An interface that defines how to scan the data from data source for 
continuous streaming
+ * An interface that defines how to load the data from data source for 
continuous streaming
  * processing.
  *
- * The execution engine will create an instance of this interface at the 
start of a streaming query,
- * then call {@link #newScanConfigBuilder(Offset)} and create an instance 
of {@link ScanConfig} for
- * the duration of the streaming query or until {@link 
#needsReconfiguration(ScanConfig)} is true.
- * The {@link ScanConfig} will be used to create input partitions and 
reader factory to process data
- * for its duration. At the end {@link #stop()} will be called when the 
streaming execution is
- * completed. Note that a single query may have multiple executions due to 
restart or failure
- * recovery.
+ * The execution engine will get an instance of this interface from a data 
source provider
+ * (e.g. {@link 
org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of 
a
+ * streaming query, then call {@link #newScanConfigBuilder(Offset)} to 
create an instance of
+ * {@link ScanConfig} for the duration of the streaming query or until
+ * {@link #needsReconfiguration(ScanConfig)} is true. The {@link 
ScanConfig} will be used to create
+ * input partitions and reader factory to scan data for its duration. At 
the end {@link #stop()}
+ * will be called when the streaming execution is completed. Note that a 
single query may have
+ * multiple executions due to restart or failure recovery.
--- End diff --

I would also add this documentation on the relevant methods. So 
getContinuousReadSupport and getMicroBatchReadSupport would say something like 
"Spark will call this method at the beginning of each streaming query to get a 
ReadSupport", newScanConfigBuilder would say something like "Spark will get a 
ScanConfig once for each data scanning job".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress

2018-08-14 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
Sure, but I'm not a committer so I can't make that happen. @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-13 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/22009
  
There's a reasonable chance that the

Error adding data: Could not find index of the source to which data was 
added

flakiness in the Kafka suite was caused by this PR. Let me know if you need 
help debugging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-13 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209708483
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
 ---
@@ -21,33 +21,39 @@
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
 import org.apache.spark.sql.types.StructType;
 
 /**
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data to the data source.
+ * provide data writing ability for batch processing.
+ *
+ * This interface is used when end users want to use a data source 
implementation directly, e.g.
+ * {@code Dataset.write.format(...).option(...).save()}.
  */
 @InterfaceStability.Evolving
-public interface WriteSupport extends DataSourceV2 {
+public interface BatchWriteSupportProvider extends DataSourceV2 {
 
   /**
-   * Creates an optional {@link DataSourceWriter} to save the data to this 
data source. Data
+   * Creates an optional {@link BatchWriteSupport} to save the data to 
this data source. Data
* sources can return None if there is no writing needed to be done 
according to the save mode.
*
* If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
* submitted.
*
-   * @param writeUUID A unique string for the writing job. It's possible 
that there are many writing
-   *  jobs running at the same time, and the returned 
{@link DataSourceWriter} can
-   *  use this job id to distinguish itself from other 
jobs.
+   * @param queryId A unique string for the writing query. It's possible 
that there are many
+   *writing queries running at the same time, and the 
returned
+   *{@link BatchWriteSupport} can use this id to 
distinguish itself from others.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data 
are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which 
is an immutable
*case-insensitive string-to-string map.
-   * @return a writer to append data to this data source
+   * @return a write support to write data to this data source.
*/
-  Optional createWriter(
-  String writeUUID, StructType schema, SaveMode mode, 
DataSourceOptions options);
+  Optional createBatchWriteSupport(
+  String queryId,
+  StructType schema,
+  SaveMode mode,
--- End diff --

To clarify, your proposal is that we should block the completion of 
DataSourceV2 until the new logical plans are in place?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-13 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209702134
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in 
the future.
+   */
+  void commit(Offset end);
--- End diff --

There are two consumer groups in streaming:

1. The one at the driver, which determines what offsets are available to 
scan.
2. The one distributed across the executors which actually performs the 
scan.

This method is used to commit certain offsets in group 1, based on the 
offsets which have been logged as processed by group 2. In microbatch mode, 
this happens to work with ScanConfig, because there is one ScanConfig for each 
offset log entry. In continuous mode there is one ScanConfig corresponding to 
an indefinite number of offset log entries, so ScanConfig does not provide the 
information required to commit any particular entry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209301920
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

Sorry, to be clear, this method was in the original streaming design doc 
https://docs.google.com/document/d/1VzxEuvpLfuHKL6vJO9qJ6ug0x9J_gLoLSH_vJL3-Cho 
which was sent out 2 months ago. If the reworking in this PR has made you 
realize there's a better way to do things, I think we should absolutely 
consider the alternative. But we can't just remove random methods without a 
complete proposal for what should replace them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209094853
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in 
the future.
+   */
+  void commit(Offset end);
--- End diff --

I'm not sure what you mean by "scan state" here. The thing that is scanned 
needs to know what offsets are available for scanning, which requires holding a 
Kafka consumer to read that information.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209059908
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in 
the future.
+   */
+  void commit(Offset end);
--- End diff --

Not in the current implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209041048
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * micro-batch mode.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
+
+  /**
+   * Returns the most recent offset available.
+   */
+  Offset latestOffset(Offset start);
--- End diff --

(Note that this is not just the Kafka reader - file streams have a similar 
option.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209040246
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * micro-batch mode.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
+
+  /**
+   * Returns the most recent offset available.
+   */
+  Offset latestOffset(Offset start);
--- End diff --

I agree in principle, but I don't know of any way to special case it 
without adding it to the API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208983568
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import 
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ *
+ * This interface is used when end users want to use a data source 
implementation directly, e.g.
+ * {@code SparkSession.readStream.format(...).option(...).load()}.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates a {@link ContinuousReadSupport} to scan the data from this 
streaming data source.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
--- End diff --

nit: This comment has been carried over to a lot of different places as we 
evolve the API. It's still true but I don't think it's really applicable here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208984614
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java ---
@@ -23,8 +23,9 @@
  * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
  *
  * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
- * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
- * a dummy data source which is un-readable/writable.
+ * the provider interfaces like {@link BatchReadSupportProvider} or
--- End diff --

nit: I would say something more like "Data source implementations must mix 
in interfaces such as {@link BatchReadSupportProvider} or {@link 
BatchWriteSupportProvider} to provide read or write functionality.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208763503
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 ---
@@ -27,10 +27,10 @@
 @InterfaceStability.Evolving
 public interface SessionConfigSupport extends DataSourceV2 {
 
-/**
- * Key prefix of the session configs to propagate. Spark will extract 
all session configs that
- * starts with `spark.datasource.$keyPrefix`, turn 
`spark.datasource.$keyPrefix.xxx - yyy`
- * into `xxx - yyy`, and propagate them to all data source 
operations in this session.
- */
-String keyPrefix();
+  /**
+   * Key prefix of the session configs to propagate. Spark will extract 
all session configs that
+   * starts with `spark.datasource.$keyPrefix`, turn 
`spark.datasource.$keyPrefix.xxx - yyy`
--- End diff --

Is `datasource` a placeholder here, or would Kafka configs now be e.g. 
`spark.datasource.kafka.[...]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208984707
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import 
org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for micro-batch stream processing.
+ *
+ * This interface is used when end users want to use a data source 
implementation directly, e.g.
+ * {@code SparkSession.readStream.format(...).option(...).load()}.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupportProvider extends DataSourceV2 {
+
+  /**
+   * Creates a {@link MicroBatchReadSupport} to scan the data from this 
streaming data source.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
--- End diff --

nit: same comment as in continuous class


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209019530
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in 
the future.
+   */
+  void commit(Offset end);
--- End diff --

I'd +1 passing a ScanConfig. I agree that all the existing sources are just 
going to pull out the offset, but "Spark is finished with this scan" is a 
cleaner semantic than "Spark is finished with the scan such that it goes up to 
this offset".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208986233
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface that defines how to scan the data from data source for 
batch processing.
--- End diff --

I think the class docs here should include a high-level overview of the 
path from here to a Spark job.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208987767
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
 ---
@@ -22,18 +22,16 @@
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * An input partition returned by {@link 
DataSourceReader#planInputPartitions()} and is
- * responsible for creating the actual data reader of one RDD partition.
- * The relationship between {@link InputPartition} and {@link 
InputPartitionReader}
- * is similar to the relationship between {@link Iterable} and {@link 
java.util.Iterator}.
+ * An input partition returned by {@link 
ReadSupport#planInputPartitions(ScanConfig)}, which
+ * represents a data split that should be processed by one Spark task.
--- End diff --

I'm not sure we need to talk about "data split" - I don't think people will 
try to implement data sources without knowing what a partition is in Spark.

I'd suggest saying "A serializable representation of an input 
partition...", to make it clear that this should just contain metadata required 
to identify what the partition is and not the actual data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209013149
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A factory used to create {@link PartitionReader} instances.
+ */
+@InterfaceStability.Evolving
+public interface PartitionReaderFactory extends Serializable {
+
+  /**
+   * Returns a row-based partition reader to read data from the given 
{@link InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the 
concrete
+   * {@link InputPartition} class defined for the data source.
+   *
+   * If this method fails (by throwing an exception), the corresponding 
Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  PartitionReader createReader(InputPartition partition);
+
+  /**
+   * Returns a columnar partition reader to read data from the given 
{@link InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the 
concrete
+   * {@link InputPartition} class defined for the data source.
+   *
+   * If this method fails (by throwing an exception), the corresponding 
Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  default PartitionReader 
createColumnarReader(InputPartition partition) {
+throw new UnsupportedOperationException("Cannot create columnar 
reader.");
+  }
+
+  /**
+   * Returns true if the given {@link InputPartition} should be read by 
Spark in a columnar way.
+   * This means, implementations must also implement {@link 
#createColumnarReader(InputPartition)}
+   * for the input partitions that this method returns true.
+   *
+   * As of Spark 2.4, Spark can only read all input partition in a 
columnar way, or none of them.
+   * Data source can't mix columnar and row-based partitions. This will be 
relaxed in future
--- End diff --

nit: may be relaxed, we shouldn't guarantee it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209018297
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface that defines how to scan the data from data source for 
continuous streaming
+ * processing.
+ *
+ * The execution engine will create an instance of this interface at the 
start of a streaming query,
+ * then call {@link #newScanConfigBuilder(Offset)} and create an instance 
of {@link ScanConfig} for
+ * the duration of the streaming query or until {@link 
#needsReconfiguration(ScanConfig)} is true.
+ * The {@link ScanConfig} will be used to create input partitions and 
reader factory to process data
+ * for its duration. At the end {@link #stop()} will be called when the 
streaming execution is
+ * completed. Note that a single query may have multiple executions due to 
restart or failure
+ * recovery.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * to do operators pushdown, streaming offsets, etc., and keep these 
information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ContinuousPartitionReaderFactory 
createContinuousReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
--- End diff --

No, I think we just need a new `ScanConfig`. (But this PR is already very 
large and that will require execution layer changes, so I'd suggest filing a 
followup for that.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208982830
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
 ---
@@ -18,19 +18,22 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
 import org.apache.spark.sql.types.StructType;
 
 /**
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability and scan the data from the data source.
+ * provide data reading ability for batch processing.
+ *
+ * This interface is used when end users want to use a data source 
implementation directly, e.g.
+ * {@code SparkSession.read.format(...).option(...).load()}.
  */
 @InterfaceStability.Evolving
-public interface ReadSupport extends DataSourceV2 {
+public interface BatchReadSupportProvider extends DataSourceV2 {
 
   /**
-   * Creates a {@link DataSourceReader} to scan the data from this data 
source.
+   * Creates a {@link BatchReadSupport} to scan the data from this data 
source.
--- End diff --

nit: ... from this data source with a user specified schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r209021329
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+
+/**
+ * A factory of {@link DataWriter} returned by
+ * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is 
responsible for creating
+ * and initializing the actual data writer at executor side.
+ *
+ * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
+ * will be created on executors and do the actual writing. So this 
interface must be
+ * serializable and {@link DataWriter} doesn't need to be.
+ */
+@InterfaceStability.Evolving
+public interface StreamingDataWriterFactory extends Serializable {
+
+  /**
+   * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
+   * object instance when sending data to the data writer, for better 
performance. Data writers
+   * are responsible for defensive copies if necessary, e.g. copy the data 
before buffer it in a
+   * list.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *Usually Spark processes many RDD partitions at the 
same time,
+   *implementations should use the partition id to 
distinguish writers for
+   *different partitions.
+   * @param taskId A unique identifier for a task that is performing the 
write of the partition
+   *   data. Spark may run multiple tasks for the same 
partition (due to speculation
+   *   or task failures, for example).
--- End diff --

Is it the ID of the task or the ID of one particular attempt of the task? 
(The target audience here is people who know a reasonable amount about Spark - 
I think we should just say TaskContext.taskAttemptId() if that's what this is.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress

2018-08-08 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
No more suggestions, the PR looks fine to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208642760
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * micro-batch mode.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
+
+  /**
+   * Returns the most recent offset available.
+   */
+  Offset latestOffset(Offset start);
--- End diff --

There's a weak form of rate control implemented by simply having sources 
lie about what the latest offset is. For example you might set 
maxOffsetsPerTrigger = 100, and then the Kafka source will pretend that only 
100 more offsets exist even if there are really more available.

Unfortunately, we're going to need to continue to support such options at 
least until the next major version after we have better rate limiting, so I 
don't think this can be removed from the source API right now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208641014
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

As I said, I'm fine with defining arbitrary JSON strings as the single 
non-customizable offset type, if you think that would be better. (I think they 
would have to be strings, because making a JSON object the type would mean 
packaging some JSON library into the API.) I don't think it would ever be 
correct to have an Offset class which doesn't trivially reduce to a key-value 
map.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208425737
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

I guess my core point is, we should stick with the existing serialization 
mechanism unless there's some kind of serialization we need to do which only a 
byte array can express. The serialization mechanism reaches deep into the 
execution layer, so coupling it with a connector API revamp is awkward.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208425199
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

I think I understand what you're saying. I could get behind a proposal to 
simply define "arbitrary JSON string" as the one and only offset type, with 
each connector responsible for writing and parsing JSON however it'd like. All 
the existing offsets are trivial case classes anyway; it'd be a bit of a 
migration, but nothing architecturally difficult to handle.

I don't see how a `toBytes` method would help the problem. Neither 
arbitrary byte arrays nor arbitrary JSON strings let Spark know what type it's 
supposed to instantiate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208392865
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

Currently, there are two representations of any given offset: a 
connector-defined JVM object and a serialized JSON string.

Spark can't build the JVM object itself because it doesn't know what the 
right type is. If you know of some clean way for a connector to declare "here 
is the type of my offsets", we should do that instead, but I only know how to 
do it through reflection magic more confusing than the status quo.

I'd hesitate to introduce a third representation unless there's some 
concrete use case where JSON serialization won't work well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208391449
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

Makes sense to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208373424
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
--- End diff --

Streaming-centric sources won't always have the initial offset be the 
oldest offset. In the Kafka source, for instance, the default is actually to 
start from the newest offset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208372469
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

The offsets are ultimately exposed as JSON inside the JSON representation 
of StreamingQueryProgress. It's important for visibility and debuggability that 
progress events contain human-readable representations of offsets.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208370493
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

The motivation for this method is things like Kafka source repartitioning. 
If a topic gets partitions added to it (or a subscription pattern gets topics 
added to it), Spark needs to schedule a new job which will scan the new 
partitions/topics, even though the Spark-side scan hasn't changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-06 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r207978414
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
 
   override def toString: String = 
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
 
-  override def setOffsetRange(start: Optional[OffsetV2], end: 
Optional[OffsetV2]): Unit = {
-synchronized {
-  startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset]
-  endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset]
-}
-  }
-
   override def deserializeOffset(json: String): OffsetV2 = 
LongOffset(json.toLong)
 
-  override def getStartOffset: OffsetV2 = synchronized {
-if (startOffset.offset == -1) null else startOffset
+  override def initialOffset: OffsetV2 = LongOffset(-1)
+
+  override def latestOffset(start: OffsetV2): OffsetV2 = {
+if (currentOffset.offset == -1) null else currentOffset
--- End diff --

Yes, I agree. The V1 API didn't require sources to implement a "this is the 
beginning of the stream, read everything" offset, but that was a mistake we 
should make sure to remedy here.

A followup PR makes sense, because there's some stream execution logic that 
can be greatly simplified when all sources have a real initial offset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-06 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r207961663
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -179,3 +192,24 @@ class InternalRowDataWriter(rowWriter: 
DataWriter[Row], encoder: ExpressionEncod
 
   override def abort(): Unit = rowWriter.abort()
 }
+
+/**
+ * Collects commit progress on writers.
+*/
+trait StreamWriterProgressCollector {
--- End diff --

This is a kinda weird interface. Can't we just create a 
StreamWriterCommitProgress(numOutputRows) instead of round-tripping through 
here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-02 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
If the individual connectors aren't doing the counting, I don't see a good 
reason to put the data inside WriterCommitMessage instead of just leaving 
StreamWriterCommitProgress as its own separate interface.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-02 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
I don't think so. The offsets for the file source need to be consumer 
owned, because they need to work with files that were generated outside of 
Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-02 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
For file streams, the offsets are just indices into a log the source keeps 
of which files it's seen. So a file sink doesn't have any access to those 
offsets.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-02 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
Minimum and maximum offset in the sink wouldn't make sense for most 
sources. There aren't any meaningful values to report for e.g. writing out 
Parquet files. It'd make sense to put them inside just the Kafka 
WriterCommitMessage, but then I don't think that requires API support.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-01 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
Sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-01 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21919
  
I like the idea of doing this, but I don't think it really belongs as part 
of the WriterCommitMessage interface. Every connector shouldn't have to 
independently count its rows; the execution framework should do the counting 
automatically, and send an independent StreamWriterCommitProgress to the driver 
along with each WriterCommitMessage.

Note that we'll probablywant to extend StreamWriterCommitProgress soon to 
carry metrics for continuous processing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21199
  
The change looks broadly good (and important) to me. I'll defer to 
@HeartSaVioR wrt the in-depth review; let me know if there are any specific 
parts I should to take a look at.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-01 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21948
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21946
  
Wouldn't the redo of the API that we're discussing obsolete this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206325719
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

Oh wait, this is the same thing we talked about in the initial round of 
review. I think "throw an error when developing the connector so you can make 
sure your metrics work right" would still be a good example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206322694
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

I thought this was a bit convoluted at first, but on reflection I can 
understand why this additional flexibility is valuable. I think it'd be worth 
writing an example here of what a source might want to do other than ignore the 
invalid metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-07-30 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21921
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...

2018-07-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21118#discussion_r204108549
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy {
   val filterCondition = postScanFilters.reduceLeftOption(And)
   val withFilter = filterCondition.map(FilterExec(_, 
scan)).getOrElse(scan)
 
-  val withProjection = if (withFilter.output != project) {
-ProjectExec(project, withFilter)
-  } else {
-withFilter
-  }
-
-  withProjection :: Nil
+  // always add the projection, which will produce unsafe rows 
required by some operators
+  ProjectExec(project, withFilter) :: Nil
 
 case r: StreamingDataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+  // ensure there is a projection, which will produce unsafe rows 
required by some operators
+  ProjectExec(r.output,
--- End diff --

Continuous processing will still be experimental in the 2.4 release, so I'm 
not tremendously concerned about this. We should eventually change the scan to 
produce rows in whatever way is most efficient in the final API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21817: [SPARK-24861][SS][test] create corrected temp directorie...

2018-07-19 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21817
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-11 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21733
  
We could still save the value of the option to offsetSeqMetadata and error 
if it's changed. The value of using an option would just be that there's no 
global default; a poweruser can set the option for the queries they think would 
benefit without affecting all the other queries which get run.

I agree it would be nice to just have some safe path allowing us to always 
use the new strategy. Absent that, there's an unfortunate tradeoff of reduced 
memory footprint vs added complexity. I think we ultimately need a committer to 
decide whether that's worth it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201792030
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -239,8 +241,9 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   @volatile private var valueSchema: StructType = _
   @volatile private var storeConf: StateStoreConf = _
   @volatile private var hadoopConf: Configuration = _
+  @volatile private var numberOfVersionsToRetainInMemory: Int = _
 
-  private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
+  private lazy val loadedMaps = new util.TreeMap[Long, 
MapType](Ordering[Long].reverse)
--- End diff --

Yeah, I was wondering about that. Makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201793040
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -64,6 +64,63 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 require(!StateStore.isMaintenanceRunning)
   }
 
+  test("retaining only latest configured size of versions in memory") {
--- End diff --

Sorry I didn't catch this earlier. We should ideally have tests that 
directly validate the specific behaviors we're documenting in the conf:

* '2' will read from cache in the direct failure case
* '1' will read from cache in the happy path but not if there's a failure
* '0' will never populate or read from the cache


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r201791805
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -825,6 +825,16 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
--- End diff --

Can this be a query option instead of a SparkConf, then? I worry it will be 
very hard to reason about the current scenario, where the conf defines how all 
states are stored - except that some streams started with a different value 
will silently override it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-07-10 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21469
  
Sure, I don't mind if we remove that metric.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-07-10 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21622
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r201407516
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public final class BoundedSortedMap extends TreeMap {
--- End diff --

This is a very general (and complicated) interface to use in only one 
place. Can we just have the state store handle its cleanup logic directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-10 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21721
  
Looks fine to me with a MemorySink example. I don't think a formal 
discussion is super necessary - the major advantage of the mixin model is to 
let us add things like this without impacting the broader API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r201402523
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

I'd suggest handling the custom metrics for Kafka outside the scope of this 
PR. Maybe we should have a default maxOffsets, but given that we don't I'm 
worried about adding a metric that's misleading in the default case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-07-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r201397799
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -825,6 +825,16 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
--- End diff --

I get worried when I see things described as "advanced features". What will 
go wrong if a user who's insufficiently advanced tries to use it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-09 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21305
  
(The last test failure is a known flaky test I've been working (albeit 
unsuccessfully so far) to find a solution for.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200538008
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

Is "lag" here just the difference (at the time a batch ends) between the 
last offset Spark knows about and the last offset Spark has processed? I'm not 
sure this is super useful to know. If maxOffsets isn't set it's always going to 
be 0, no matter how far Spark gets behind the Kafka cluster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537533
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -379,3 +384,16 @@ private[kafka010] case class 
KafkaMicroBatchInputPartitionReader(
 }
   }
 }
+
+// Currently reports per topic-partition lag.
--- End diff --

nit: javadoc style for top level comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537276
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsCustomMetrics.java
 ---
@@ -0,0 +1,30 @@
+/*
--- End diff --

This should probably be in v2/reader/streaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -178,12 +180,18 @@ class SourceProgress protected[sql](
   if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
 }
 
-("description" -> JString(description)) ~
+val jsonVal = ("description" -> JString(description)) ~
   ("startOffset" -> tryParse(startOffset)) ~
   ("endOffset" -> tryParse(endOffset)) ~
   ("numInputRows" -> JInt(numInputRows)) ~
   ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
   ("processedRowsPerSecond" -> 
safeDoubleToJValue(processedRowsPerSecond))
+
+if (customMetrics != null) {
+  jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json()))
--- End diff --

Is there any way to get an error to the user if their custom metrics fail 
to parse? I'm not entirely sure that's the right thing to do, but I worry that 
it'll be hard to develop against this API if we just silently drop malformed 
metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r200454033
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -805,6 +806,75 @@ class StreamSuite extends StreamTest {
 }
   }
 
+  test("streaming limit without state") {
--- End diff --

resolved


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r200454145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -354,6 +355,24 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  /**
+   * Used to plan the streaming global limit operator.
--- End diff --

As discussed offline, comment on why we need ReturnAnswer handling


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-06-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r199288285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.streaming.state.StateStoreOps
+import org.apache.spark.sql.types.{LongType, NullType, StructField, 
StructType}
+import org.apache.spark.util.CompletionIterator
+
+/**
+ * A physical operator for executing a streaming limit, which makes sure 
no more than streamLimit
+ * rows are returned.
+ */
+case class StreamingLimitExec(
+streamLimit: Long,
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None)
+  extends UnaryExecNode with StateStoreWriter {
+
+  private val keySchema = StructType(Array(StructField("key", NullType)))
+  private val valueSchema = StructType(Array(StructField("value", 
LongType)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateInfo,
+  keySchema,
+  valueSchema,
+  indexOrdinal = None,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val key = UnsafeProjection.create(keySchema)(new 
GenericInternalRow(Array[Any](null)))
+  val numOutputRows = longMetric("numOutputRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+  val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+  val commitTimeMs = longMetric("commitTimeMs")
+  val updatesStartTimeNs = System.nanoTime
+
+  val startCount: Long = 
Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
+  var rowCount = startCount
+
+  val result = iter.filter { r =>
+val x = rowCount < streamLimit
--- End diff --

Oh, I missed that distribution. Makes sense then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-06-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r199284336
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.streaming.state.StateStoreOps
+import org.apache.spark.sql.types.{LongType, NullType, StructField, 
StructType}
+import org.apache.spark.util.CompletionIterator
+
+/**
+ * A physical operator for executing a streaming limit, which makes sure 
no more than streamLimit
+ * rows are returned.
+ */
+case class StreamingLimitExec(
+streamLimit: Long,
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None)
+  extends UnaryExecNode with StateStoreWriter {
+
+  private val keySchema = StructType(Array(StructField("key", NullType)))
+  private val valueSchema = StructType(Array(StructField("value", 
LongType)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateInfo,
+  keySchema,
+  valueSchema,
+  indexOrdinal = None,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val key = UnsafeProjection.create(keySchema)(new 
GenericInternalRow(Array[Any](null)))
+  val numOutputRows = longMetric("numOutputRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+  val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+  val commitTimeMs = longMetric("commitTimeMs")
+  val updatesStartTimeNs = System.nanoTime
+
+  val startCount: Long = 
Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
+  var rowCount = startCount
+
+  val result = iter.filter { r =>
+val x = rowCount < streamLimit
--- End diff --

Isn't this going to result in `streamLimit` records in each partition? I 
would expect we'd need something like the Global/LocalLimit split.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-06-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r199284559
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -805,6 +806,75 @@ class StreamSuite extends StreamTest {
 }
   }
 
+  test("streaming limit without state") {
--- End diff --

Related to my above comment, I think all of these tests end up only testing 
a single input partition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

2018-06-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21662#discussion_r199283820
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
 ---
@@ -70,35 +68,9 @@ class MemorySinkSuite extends StreamTest with 
BeforeAndAfter {
 checkAnswer(sink.allData, 1 to 9)
   }
 
-  test("directly add data in Append output mode with row limit") {
--- End diff --

Nit: I'd kinda prefer reverting as a separate PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20351: [SPARK-23014][SS] Fully remove V1 memory sink.

2018-06-27 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20351


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198571824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--

[GitHub] spark issue #21560: [SPARK-24386][SS] coalesce(1) aggregates in continuous p...

2018-06-27 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21560
  
Sorry, that wasn't meant to be a complete push. Added the tests now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198571496
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

(same comment as above applies here - we don't have partitioning 
information in analysis)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198337615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

We list the partitions when computing the coalesce RDD. Should we instead 
be packing the partitions into the partitions of the coalesce RDD? I'd assumed 
it was valid to expect that rdd.partitions would work on executors, but maybe 
it's not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...

2018-06-25 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21617
  
Well, "clear" is relative. Since we're trying to provide functionality in 
the Dataframe API, it's perfectly alright for the RDD graph to end up looking a 
bit weird. It seems feasible to do something like:

* Have a stream reader RDD write side output to some special shuffle 
partition (set of partitions?) which the main query knows not to read.
* Have a stream writer RDD with two heterogeneous sets of partitions: one 
to write the main query to the sink, and another to apply the specified action 
to the side output.

I agree that watermarks should be applied immediately after the data reader 
- other streaming systems generally require this, and Spark does not seem to be 
getting any benefits from having a more general watermark concept. I haven't 
had time to push for this change, but I think it's known that the current Spark 
watermark model is flawed - I'd support fixing it for sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197936350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

Oh wait, I see what you mean. Repartition(5, ...) would never be matched by 
this rule, since it only applies to Aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197935989
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

I don't think there's any particular reason we need to. There's no reason 
we couldn't execute multiple repartitions if the optimizer isn't smart enough 
to combine them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197933535
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
--- End diff --

This is just a default argument to make tests less wordy. I can remove it 
if you think that's best, but it doesn't impose a restriction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197933175
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

I've made some changes to try to restrict the assumption that the number of 
partitions is 1 to two places:

* ContinuousCoalesceExec
* The output partitioner in ContinuousCoalesceRDD, since it's not obvious 
to me what the right strategy to get this would be in the general case. If you 
have ideas I'm open to removing this too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197930245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
--- End diff --

Repartition would normally imply distributed execution, which isn't 
happening here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197929943
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
   

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197929262
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

I agree that it wouldn't be needed, but partitioning information is not 
always available during analysis. So I don't think we can write the more 
granular check suggested here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197928934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
   override def getPreferredLocations(split: Partition): Seq[String] = {
 
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
+
+  override def clearDependencies(): Unit = {
+throw new IllegalStateException("Continuous RDDs cannot be 
checkpointed")
--- End diff --

I don't know, I'm unfamiliar with this method. @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21617: [SPARK-24634][SS] Add a new metric regarding number of r...

2018-06-25 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21617
  
LGTM, but note that the rows being counted here are the rows persisted into 
the state store, which aren't necessarily the input rows. So the side-channel 
described in the JIRA would be orthogonal to this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196930368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

We need to be able to generate the full list of partitions from within a 
single task in order for coalesce to work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196924994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threads.foreach(_.interrupt())
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+  threads.foreach(_.start())
+}
+
+reader.compute(reader.partitions(split.index), context)
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+Seq(new NarrowDependency(prev) {
+  def getParents(id: Int): Seq[Int] = Seq(0)
--- End diff --

Yeah, I confused myself when looking at the normal coalesce RDD. The 
default dependency handling is correct here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >