[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20369


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164036851
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -37,8 +41,28 @@
*/
   void commit(long epochId, WriterCommitMessage[] messages);
--- End diff --

Good idea. PTAL at the wording


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164036304
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1110,6 +1110,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(100)
 
+  val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
+.internal()
+.doc("A comma-separated list of fully qualified data source register 
class names for which" +
+  " StreamWriteSupport is disabled. Writes to these sources will fail 
back to the V1 Sink.")
--- End diff --

DataStreamWriter will call DataSource.createSink(), which will notice the 
providing class doesn't have a (V1) sink implementation and throw "Data source 
$className does not support streamed writing".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164025211
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -37,8 +41,28 @@
*/
   void commit(long epochId, WriterCommitMessage[] messages);
--- End diff --

shall we also document the requirement about idempotent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164024953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1110,6 +1110,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(100)
 
+  val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
+.internal()
+.doc("A comma-separated list of fully qualified data source register 
class names for which" +
+  " StreamWriteSupport is disabled. Writes to these sources will fail 
back to the V1 Sink.")
--- End diff --

what will happen if the specified class only implements v2 sink?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164011714
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 ---
@@ -184,6 +184,24 @@ class StreamingDataSourceV2Suite extends StreamTest {
 }
   }
 
+  test("disabled v2 write") {
+// Ensure the V2 path works normally and generates a V2 sink..
+val v2Query = testPositiveCase(
+  "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once())
+assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+  .isInstanceOf[FakeWriteV1Fallback])
+
+// Ensure we create a V1 sink with the config. Note the config is a 
comma separated
+// list, including other fake entries.
+val fullSinkName = 
"org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
+withSQLConf("spark.sql.streaming.disabledV2Writers" -> 
s"a,b,c,test,$fullSinkName,d,e") {
--- End diff --

"spark.sql.streaming.disabledV2Writers" -> 
SQLConf.DISABLED_V2_STREAMING_WRITERS.key



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164008791
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -37,8 +41,28 @@
*/
   void commit(long epochId, WriterCommitMessage[] messages);
 
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
+   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
+   * null slots as there maybe only a few data writers that are committed 
before the abort
+   * happens, or some data writers were committed but their commit 
messages haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" 
for data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(long epochId, WriterCommitMessage[] messages);
+
   default void commit(WriterCommitMessage[] messages) {
 throw new UnsupportedOperationException(
-   "Commit without epoch should not be called with ContinuousWriter");
+   "Commit without epoch should not be called with StreamWriter");
--- End diff --

nit. One more space at the start?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r164007657
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1110,6 +1110,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(100)
 
+  val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
+.internal()
+.doc("A comma-separated list of fully qualified data source register 
class names for which" +
+  " StreamWriteSupport is disabled. Writes to these sources will faill 
back to the V1 Sink.")
--- End diff --

nit. `faill` -> `fail`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163930702
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
 ---
@@ -48,7 +48,7 @@
  * @param options the options for the returned data source writer, 
which is an immutable
  *case-insensitive string-to-string map.
  */
-Optional createContinuousWriter(
+Optional createStreamWriter(
--- End diff --

You're right, we don't need to. Changed to just StreamWriter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163751348
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
 ---
@@ -48,7 +48,7 @@
  * @param options the options for the returned data source writer, 
which is an immutable
  *case-insensitive string-to-string map.
  */
-Optional createContinuousWriter(
+Optional createStreamWriter(
--- End diff --

do we still need to return `Optional`? In which case an implementation 
should return `None`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163751286
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 trigger = trigger)
 } else {
   val ds = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-  val sink = (ds.newInstance(), trigger) match {
-case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
-case (_, _: ContinuousTrigger) => throw new 
UnsupportedOperationException(
-s"Data source $source does not support continuous writing")
-case (w: MicroBatchWriteSupport, _) => w
+  val disabledSources = 
df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
--- End diff --

ok so this is only useful for built-in stream sources, as the v1 source API 
is not public,


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163751198
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
 ---
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data from a microbatch to the 
data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
-   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
-   *can use this id to distinguish itself from others.
-   * @param epochId The unique numeric ID of the batch within this writing 
query. This is an
-   *incrementing counter representing a consistent set of 
data; the same batch may
-   *be started multiple times in failure recovery 
scenarios, but it will always
-   *contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch 
output means to this
-   * sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which 
is an immutable
-   *case-insensitive string-to-string map.
-   */
-  Optional createMicroBatchWriter(
--- End diff --

agreed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163751107
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -62,7 +62,7 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceV2Writer, query: SparkPlan)
 
 try {
   val runTask = writer match {
-case w: ContinuousWriter =>
+case w: StreamWriter =>
--- End diff --

I don't have a better idea, but at least we should add some comments here 
to explain this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163628099
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
 ---
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data from a microbatch to the 
data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
-   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
-   *can use this id to distinguish itself from others.
-   * @param epochId The unique numeric ID of the batch within this writing 
query. This is an
-   *incrementing counter representing a consistent set of 
data; the same batch may
-   *be started multiple times in failure recovery 
scenarios, but it will always
-   *contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch 
output means to this
-   * sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which 
is an immutable
-   *case-insensitive string-to-string map.
-   */
-  Optional createMicroBatchWriter(
--- End diff --

The writer can drop the duplicate epoch on commit. This is of course less 
efficient, but the scenario only happens in the rare case where the driver 
fails between running the writer and dumping to the commit log, so I think 
that's fine. (The writer will have to implement the drop-on-commit path either 
way.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163626800
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 trigger = trigger)
 } else {
   val ds = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-  val sink = (ds.newInstance(), trigger) match {
-case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
-case (_, _: ContinuousTrigger) => throw new 
UnsupportedOperationException(
-s"Data source $source does not support continuous writing")
-case (w: MicroBatchWriteSupport, _) => w
+  val disabledSources = 
df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
--- End diff --

Yes. The intent is to support two things:

* Turning off the V2 implementation for a source if someone has problems 
with it.
* Running some subset of Spark unit tests with V2 disabled, ensuring that 
the V1 execution pipeline still works for as long as we support it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163620226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -62,7 +62,7 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceV2Writer, query: SparkPlan)
 
 try {
   val runTask = writer match {
-case w: ContinuousWriter =>
+case w: StreamWriter =>
--- End diff --

No - the writer will be wrapped in a MicroBatchWriter which doesn't match 
StreamWriter. I agree this is a bit unclear and would welcome suggestions on 
how to refactor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163619964
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
 ---
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data from a microbatch to the 
data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
-   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
-   *can use this id to distinguish itself from others.
-   * @param epochId The unique numeric ID of the batch within this writing 
query. This is an
-   *incrementing counter representing a consistent set of 
data; the same batch may
-   *be started multiple times in failure recovery 
scenarios, but it will always
-   *contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch 
output means to this
-   * sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which 
is an immutable
-   *case-insensitive string-to-string map.
-   */
-  Optional createMicroBatchWriter(
--- End diff --

I think the implementation shouldn't do this. It didn't have the option to 
do so in the V1 API, and it seems hard to reason about data sources 
second-guessing the engine's decisions about what batches must be run.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163533890
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 trigger = trigger)
 } else {
   val ds = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-  val sink = (ds.newInstance(), trigger) match {
-case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
-case (_, _: ContinuousTrigger) => throw new 
UnsupportedOperationException(
-s"Data source $source does not support continuous writing")
-case (w: MicroBatchWriteSupport, _) => w
+  val disabledSources = 
df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
--- End diff --

is this option created for data sources that implement both v1 and v2 APIs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163532503
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
 ---
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data from a microbatch to the 
data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
-   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
-   *can use this id to distinguish itself from others.
-   * @param epochId The unique numeric ID of the batch within this writing 
query. This is an
-   *incrementing counter representing a consistent set of 
data; the same batch may
-   *be started multiple times in failure recovery 
scenarios, but it will always
-   *contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch 
output means to this
-   * sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which 
is an immutable
-   *case-insensitive string-to-string map.
-   */
-  Optional createMicroBatchWriter(
--- End diff --

With this API, the implementation can return None if `epochId` is 
duplicated. How can we achieve this with the new `StreamWriteSupport`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20369#discussion_r163531998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -62,7 +62,7 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceV2Writer, query: SparkPlan)
 
 try {
   val runTask = writer match {
-case w: ContinuousWriter =>
+case w: StreamWriter =>
--- End diff --

Will we hit this branch under micro batch mode?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...

2018-01-23 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20369

[SPARK-23196] Unify continuous and microbatch V2 sinks

## What changes were proposed in this pull request?

Replace streaming V2 sinks with a unified StreamWriteSupport interface, 
with a shim to use it with microbatch execution.

Add a new SQL config to use for disabling V2 sinks, falling back to the V1 
sink implementation.

## How was this patch tested?

Existing tests, which in the case of Kafka (the only existing continuous V2 
sink) now use V2 for microbatch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark streaming-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20369


commit 94c06a5f9a9d88810a43ac66722f58ffa45709f0
Author: Jose Torres 
Date:   2018-01-23T20:47:44Z

change sink

commit ee773f4cc7d6cfbb14b40c2e7961386ea2742612
Author: Jose Torres 
Date:   2018-01-23T21:12:20Z

add config

commit d722bbf2f253dff0b7da0111b4e75529dc591813
Author: Jose Torres 
Date:   2018-01-23T22:07:11Z

fix internal row




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org