[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20689


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-03-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20689#discussion_r174667490
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -164,7 +164,15 @@ case class KafkaContinuousDataReaderFactory(
 startOffset: Long,
 kafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+failOnDataLoss: Boolean) extends 
ContinuousDataReaderFactory[UnsafeRow] {
+
+  override def createDataReaderWithOffset(offset: PartitionOffset): 
DataReader[UnsafeRow] = {
+val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
+assert(kafkaOffset.topicPartition == topicPartition)
--- End diff --

Got it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-03-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20689#discussion_r174585423
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -164,7 +164,15 @@ case class KafkaContinuousDataReaderFactory(
 startOffset: Long,
 kafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+failOnDataLoss: Boolean) extends 
ContinuousDataReaderFactory[UnsafeRow] {
+
+  override def createDataReaderWithOffset(offset: PartitionOffset): 
DataReader[UnsafeRow] = {
+val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
+assert(kafkaOffset.topicPartition == topicPartition)
--- End diff --

This may happen. I prefer to use `require` like this:
```
require(kafkaOffset.topicPartition == topicPartition, s"expected: 
$topicPartition actual: ${kafkaOffset.topicPartition}")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-03-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20689#discussion_r174585821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -106,7 +106,19 @@ case class RateStreamContinuousDataReaderFactory(
 partitionIndex: Int,
 increment: Long,
 rowsPerSecond: Double)
-  extends DataReaderFactory[Row] {
+  extends ContinuousDataReaderFactory[Row] {
+
+  override def createDataReaderWithOffset(offset: PartitionOffset): 
DataReader[Row] = {
+val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
+assert(rateStreamOffset.partition == partitionIndex)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-03-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20689#discussion_r174585843
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+
+/**
+ * A mix-in interface for {@link DataReaderFactory}. Continuous data 
reader factories can
+ * implement this interface to provide creating {@link DataReader} with 
particular offset.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousDataReaderFactory extends 
DataReaderFactory {
+  /**
+   * Create a DataReader with particular offset as its startOffset.
+   *
+   * @param offset offset want to set as the DataReader's startOffset.
+   */
+  default DataReader createDataReaderWithOffset(PartitionOffset offset) 
{
+throw new IllegalStateException(
--- End diff --

+1 to make this one just abstract.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-02-28 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20689#discussion_r171332700
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+
+/**
+ * A mix-in interface for {@link DataReaderFactory}. Continuous data 
reader factories can
+ * implement this interface to provide creating {@link DataReader} with 
particular offset.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousDataReaderFactory extends 
DataReaderFactory {
+  /**
+   * Create a DataReader with particular offset as its startOffset.
+   *
+   * @param offset offset want to set as the DataReader's startOffset.
+   */
+  default DataReader createDataReaderWithOffset(PartitionOffset offset) 
{
+throw new IllegalStateException(
--- End diff --

I don't know if we want a default here - it seems like subclasses should 
always be able to provide an implementation, and thus that we should always 
require them to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...

2018-02-27 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/20689

[SPARK-23533][SS] Add support for changing ContinuousDataReader's 
startOffset

## What changes were proposed in this pull request?

As discussion in #20675, we need add a new interface 
`ContinuousDataReaderFactory` to support the requirements of setting start 
offset in Continuous Processing.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-23533

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20689


commit 59cef98868586a4f039b04e74c32c94eaff965c0
Author: Yuanjian Li 
Date:   2018-02-28T07:29:57Z

[SPARK-23533][SS] Add support for changing ContinousDataReader's startOffset




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org