[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-12 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r224755348
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -39,7 +39,22 @@ case class SparkListenerSQLExecutionStart(
 
 @DeveloperApi
 case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
-  extends SparkListenerEvent
+  extends SparkListenerEvent {
+
+  // The name of the execution, e.g. `df.collect` will trigger a SQL 
execution with name "collect".
+  @JsonIgnore private[sql] var executionName: Option[String] = None
+
+  // The following 3 fields are only accessed when `executionName` is 
defined.
+
+  // The duration of the SQL execution, in nanoseconds.
+  @JsonIgnore private[sql] var duration: Long = 0L
--- End diff --

did you verify that the JsonIgnore annotation actually works? For some 
reason, I actually needed to annotate the class as
```scala
@JsonIgnoreProperties(Array("a", b", "c"))
class SomeClass {
  @JsonProperty("a") val a: ...
  @JsonProperty("b") val a: ...
}
```

the reason being Json4s understands that API better. I believe we use 
Json4s for all of these events


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...

2018-10-11 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/22674
  
I would just up the timeout in that suite. Now that we're pushing a bunch 
more stuff to the LiveListenerBus, it may not be draining quickly enough. On 
slow jenkins' it could likely cause flakiness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r224013755
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3356,21 +3356,11 @@ class Dataset[T] private[sql](
* user-registered callback functions.
*/
   private def withAction[U](name: String, qe: QueryExecution)(action: 
SparkPlan => U) = {
-try {
-  qe.executedPlan.foreach { plan =>
-plan.resetMetrics()
-  }
-  val start = System.nanoTime()
-  val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
-action(qe.executedPlan)
-  }
-  val end = System.nanoTime()
-  sparkSession.listenerManager.onSuccess(name, qe, end - start)
-  result
-} catch {
-  case e: Exception =>
-sparkSession.listenerManager.onFailure(name, qe, e)
-throw e
+qe.executedPlan.foreach { plan =>
--- End diff --

can't executedPlan throw an exception? I thought it can if the original 
spark plan failed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r224006828
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
  */
 @Experimental
 @InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
-  private[sql] def this(conf: SparkConf) = {
-this()
+// The `session` is used to indicate which session carries this listener 
manager, and we only
+// catch SQL executions which are launched by the same session.
+// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
+// user-specified listeners during construction. We should not do it when 
cloning this listener
+// manager, as we will copy all listeners to the cloned listener manager.
+class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
+  extends SparkListener with Logging {
+
+  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
+
+  if (loadExtensions) {
+val conf = session.sparkContext.conf
 conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
   Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
 }
   }
 
+  session.sparkContext.listenerBus.addToSharedQueue(this)
+
   /**
* Registers the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def register(listener: QueryExecutionListener): Unit = writeLock {
-listeners += listener
+  def register(listener: QueryExecutionListener): Unit = {
+listeners.add(listener)
   }
 
   /**
* Unregisters the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def unregister(listener: QueryExecutionListener): Unit = writeLock {
-listeners -= listener
+  def unregister(listener: QueryExecutionListener): Unit = {
+listeners.remove(listener)
   }
 
   /**
* Removes all the registered [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def clear(): Unit = writeLock {
+  def clear(): Unit = {
 listeners.clear()
   }
 
   /**
* Get an identical copy of this listener manager.
*/
-  @DeveloperApi
-  override def clone(): ExecutionListenerManager = writeLock {
-val newListenerManager = new ExecutionListenerManager
-listeners.foreach(newListenerManager.register)
+  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
+val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
+listeners.iterator().asScala.foreach(newListenerManager.register)
 newListenerManager
   }
 
-  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
-readLock {
-  withErrorHandling { listener =>
-listener.onSuccess(funcName, qe, duration)
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
+case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
+  val funcName = e.executionName.get
+  e.executionFailure match {
+case Some(ex) =>
+  listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
+case _ =>
+  listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, 
e.duration))
   }
-}
-  }
 
-  private[sql] def onFailure(funcName: String, qe: QueryExecution, 
exception: Exception): Unit = {
-readLock {
-  withErrorHandling { listener =>
-listener.onFailure(funcName, qe, exception)
-  }
-}
+case _ => // Ignore
   }
 
-  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
-
-  /** A lock to prevent updating the list of listeners while we are 
traversing through them. */
-  private[this] val lock = new ReentrantReadWriteLock()
-
-  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = 
{
-for (listener <- listeners) {
-  try {
-f(listener)
-  } catch {
-case NonFatal(e) => logWarning("Error executing query execution 
listener", e)
-  }
-}
-  }
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-val rl = lock.readLock()
-rl.lock()
-try f finally {
-  rl.unlock()
-}
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-val wl = lock

[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r224000145
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3356,21 +3356,11 @@ class Dataset[T] private[sql](
* user-registered callback functions.
*/
   private def withAction[U](name: String, qe: QueryExecution)(action: 
SparkPlan => U) = {
-try {
-  qe.executedPlan.foreach { plan =>
-plan.resetMetrics()
-  }
-  val start = System.nanoTime()
-  val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
-action(qe.executedPlan)
-  }
-  val end = System.nanoTime()
-  sparkSession.listenerManager.onSuccess(name, qe, end - start)
-  result
-} catch {
-  case e: Exception =>
-sparkSession.listenerManager.onFailure(name, qe, e)
-throw e
+qe.executedPlan.foreach { plan =>
--- End diff --

can this throw an exception? Imagine if `df.count()` threw an exception, 
and then you run it again.
Won't this be a behavior change in that case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r224000809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---
@@ -71,14 +72,35 @@ object SQLExecution {
   val callSite = sc.getCallSite()
 
   withSQLConfPropagated(sparkSession) {
-sc.listenerBus.post(SparkListenerSQLExecutionStart(
-  executionId, callSite.shortForm, callSite.longForm, 
queryExecution.toString,
-  SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), 
System.currentTimeMillis()))
+var ex: Option[Exception] = None
+val startTime = System.currentTimeMillis()
 try {
+  sc.listenerBus.post(SparkListenerSQLExecutionStart(
+executionId = executionId,
+description = callSite.shortForm,
+details = callSite.longForm,
+physicalPlanDescription = queryExecution.toString,
+// `queryExecution.executedPlan` triggers query planning. If 
it fails, the exception
+// will be caught and reported in the 
`SparkListenerSQLExecutionEnd`
+sparkPlanInfo = 
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
+time = startTime))
   body
+} catch {
+  case e: Exception =>
+ex = Some(e)
+throw e
 } finally {
-  sc.listenerBus.post(SparkListenerSQLExecutionEnd(
-executionId, System.currentTimeMillis()))
+  val endTime = System.currentTimeMillis()
+  val event = SparkListenerSQLExecutionEnd(executionId, endTime)
+  // Currently only `Dataset.withAction` and 
`DataFrameWriter.runCommand` specify the `name`
+  // parameter. The `ExecutionListenerManager` only watches SQL 
executions with name. We
+  // can specify the execution name in more places in the future, 
so that
+  // `QueryExecutionListener` can track more cases.
+  event.executionName = name
+  event.duration = endTime - startTime
--- End diff --

duration used to be reported in nanos. Now it's millis. I would still 
report it as nanos if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22478: [SPARK-25472][SS] Don't have legitimate stops of streams...

2018-09-20 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/22478
  
thanks! merging to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22478: [SPARK-25472] Don't have legitimate stops of stre...

2018-09-19 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/22478

[SPARK-25472] Don't have legitimate stops of streams cause stream exceptions

## What changes were proposed in this pull request?

Legitimate stops of streams may actually cause an exception to be captured 
by stream execution, because the job throws a SparkException regarding job 
cancellation during a stop. This PR makes the stop more graceful by swallowing 
this cancellation error.

## How was this patch tested?

This is pretty hard to test. The existing tests should make sure that we're 
not swallowing other specific SparkExceptions. I've also run the 
`KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, 
whereas it used to be flaky.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark SPARK-25472

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22478


commit dcfc20b291affa6dcc396cdca678ce1f52184dce
Author: Burak Yavuz 
Date:   2018-09-19T23:43:57Z

Don't have legitimate stops of streams cause stream exceptions

commit 3b8addb9cf02489978594505470fdd527a35c2a7
Author: Burak Yavuz 
Date:   2018-09-19T23:46:47Z

fix scalastyle




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22478: [SPARK-25472] Don't have legitimate stops of streams cau...

2018-09-19 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/22478
  
cc @zsxwing @jose-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...

2018-09-10 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20673
  
After benchmarking, observed that this didn't provide much benefit :( 
Closing the PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-09-10 Thread brkyvz
Github user brkyvz closed the pull request at:

https://github.com/apache/spark/pull/20673


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...

2018-08-27 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/22210
  
LGTM! Good catches


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r211031056
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends 
Logging {
 
   /** setup the whole embedded servers, including Zookeeper and Kafka 
brokers */
   def setup(): Unit = {
+// Set up a KafkaTestUtils leak detector so that we can see where the 
leak KafkaTestUtils is
+// created.
+val exception = new SparkException("It was created at: ")
+leakDetector = ShutdownHookManager.addShutdownHook { () =>
+  logError("Found a leak KafkaTestUtils.", exception)
+}
+
 setupEmbeddedZookeeper()
 setupEmbeddedKafkaServer()
   }
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-// There is a race condition that may kill JVM when terminating the 
Kafka cluster. We set
-// a custom Procedure here during the termination in order to keep JVM 
running and not fail the
-// tests.
-val logExitEvent = new Exit.Procedure {
-  override def execute(statusCode: Int, message: String): Unit = {
-logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
-  }
+if (leakDetector != null) {
+  ShutdownHookManager.removeShutdownHook(leakDetector)
 }
-Exit.setExitProcedure(logExitEvent)
-Exit.setHaltProcedure(logExitEvent)
-try {
-  brokerReady = false
-  zkReady = false
-
-  if (producer != null) {
-producer.close()
-producer = null
-  }
+brokerReady = false
--- End diff --

do these need to be thread safe? Is boot up, and cleanup serial?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r211030720
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -130,6 +130,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
 
   /** setup the whole embedded servers, including Zookeeper and Kafka 
brokers */
   def setup(): Unit = {
+// Set up a KafkaTestUtils leak detector so that we can see where the 
leak KafkaTestUtils is
+// created.
+val exception = new SparkException("It was created at: ")
--- End diff --

nice trick


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-15 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/21559
  
Thanks! Merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-15 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195798990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: 
Array[UnsafeRow])
 }
 
 /** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
   def allData: Seq[Row]
   def latestBatchData: Seq[Row]
   def dataSinceBatch(sinceBatchId: Long): Seq[Row]
   def latestBatchId: Option[Long]
+
+  /**
+   * Truncates the given rows to return at most maxRows rows.
+   * @param rows The data that may need to be truncated.
+   * @param batchLimit Number of rows to keep in this batch; the rest will 
be truncated
+   * @param sinkLimit Total number of rows kept in this sink, for logging 
purposes.
+   * @param batchId The ID of the batch that sent these rows, for logging 
purposes.
+   * @return Truncated rows.
+   */
+  protected def truncateRowsIfNeeded(
+  rows: Array[Row],
+  batchLimit: Int,
+  sinkLimit: Int,
+  batchId: Long): Array[Row] = {
+if (rows.length > batchLimit && batchLimit >= 0) {
+  logWarning(s"Truncating batch $batchId to $batchLimit rows because 
of sink limit $sinkLimit")
--- End diff --

nit: not sure if these sinks get used by Continuous processing too. If so I 
would rename `batch` to `trigger version`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-15 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195797571
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: 
Array[UnsafeRow])
 }
 
 /** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
   def allData: Seq[Row]
   def latestBatchData: Seq[Row]
   def dataSinceBatch(sinceBatchId: Long): Seq[Row]
   def latestBatchId: Option[Long]
+
+  /**
+   * Truncates the given rows to return at most maxRows rows.
+   * @param rows The data that may need to be truncated.
+   * @param batchLimit Number of rows to keep in this batch; the rest will 
be truncated
+   * @param sinkLimit Total number of rows kept in this sink, for logging 
purposes.
+   * @param batchId The ID of the batch that sent these rows, for logging 
purposes.
+   * @return Truncated rows.
+   */
+  protected def truncateRowsIfNeeded(
+  rows: Array[Row],
+  batchLimit: Int,
+  sinkLimit: Int,
+  batchId: Long): Array[Row] = {
+if (rows.length > batchLimit && batchLimit >= 0) {
+  logWarning(s"Truncating batch $batchId to $batchLimit rows because 
of sink limit $sinkLimit")
+  rows.take(batchLimit)
+} else {
+  rows
+}
+  }
+}
+
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+  val MAX_MEMORY_SINK_ROWS = "maxRows"
+  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+  /**
+   * Gets the max number of rows a MemorySink should store. This number is 
based on the memory
+   * sink row limit if it is set. If not, there is no limit.
+   * @param options Options for writing from which we get the max rows 
option
+   * @return The maximum number of rows a memorySink should store, or None 
for no limit.
--- End diff --

need to update docs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195268299
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
   def latestBatchId: Option[Long]
 }
 
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
+  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+  /**
+   * Gets the max number of rows a MemorySink should store. This number is 
based on the memory
+   * sink row limit if it is set. If not, there is no limit.
+   * @param options Options for writing from which we get the max rows 
option
+   * @return The maximum number of rows a memorySink should store, or None 
for no limit.
+   */
+  def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
+val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, 
MAX_MEMORY_SINK_ROWS_DEFAULT)
+if (maxRows >= 0) Some(maxRows) else None
+  }
+}
+
+
--- End diff --

nit: remove extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195268861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
   def latestBatchId: Option[Long]
 }
 
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
+  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+  /**
+   * Gets the max number of rows a MemorySink should store. This number is 
based on the memory
+   * sink row limit if it is set. If not, there is no limit.
+   * @param options Options for writing from which we get the max rows 
option
+   * @return The maximum number of rows a memorySink should store, or None 
for no limit.
+   */
+  def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
+val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, 
MAX_MEMORY_SINK_ROWS_DEFAULT)
+if (maxRows >= 0) Some(maxRows) else None
--- End diff --

Do you want to do `if (maxRows >= 0) maxRows else Int.MaxValue - 10`
We can't exceed runtime array max size anyway


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195269434
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 ---
@@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with MemorySinkB
 
   def clear(): Unit = synchronized {
 batches.clear()
+numRows = 0
+  }
+
+  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, 
batchId: Long): Array[Row] = {
+if (rows.length > maxRows) {
+  logWarning(s"Truncating batch $batchId to $maxRows rows")
--- End diff --

How does take behave with negative rows? Printing a warning message with 
negative values may be weird. I would also include the sink limit in the 
warning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195268999
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 ---
@@ -81,22 +84,35 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with MemorySinkB
 }.mkString("\n")
   }
 
-  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): 
Unit = {
+  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row], 
sinkCapacity: Option[Int])
--- End diff --

nit: our style is more like
```scala
  def write(
batchId: Long,
outputMode: OutputMode,
newRows: Array[Row],
sinkCapacity: Option[Int]): Unit = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195268218
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
   def latestBatchId: Option[Long]
 }
 
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
--- End diff --

`maxRows` is sufficient


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-13 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/21559
  
Jenkins add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-13 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/21559
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21275: [SPARK-24214][SS]Fix toJSON for StreamingRelationV2/Stre...

2018-05-08 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/21275
  
LGTM! Pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185886437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -128,40 +130,49 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
   protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
-triggerExecutor.execute(() => {
-  startTrigger()
 
+triggerExecutor.execute(() => {
   if (isActive) {
+var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
+var currentBatchHadNewData = false // Whether the current batch 
had new data
+
 reportTimeTaken("triggerExecution") {
+  startTrigger()
+
+  // We'll do this initialization only once every start / restart
   if (currentBatchId < 0) {
-// We'll do this initialization only once
 populateStartOffsets(sparkSessionForStream)
-
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
+logInfo(s"Stream started from $committedOffsets")
   }
-  if (dataAvailable) {
-currentStatus = currentStatus.copy(isDataAvailable = true)
+
+  
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+  // Try to construct the next batch. This will return true only 
if the next batch is
+  // ready and runnable. Note that the current batch may be 
runnable even without
+  // new data to process as `constructNextBatch` may decide to run 
a batch for
+  // state cleanup, etc. `isNewDataAvailable` will be updated to 
reflect whether new data
+  // is available or not.
+  currentBatchIsRunnable = constructNextBatch()
+
+  currentStatus = currentStatus.copy(isDataAvailable = 
isNewDataAvailable)
--- End diff --

then you can do something like:
```scala
if (currentBatchIsRunnable && currentBatchHasNewData) {
  updateStatusMessage("Processing new data")
  runBatch(sparkSessionForStream)
} else if (currentBatchIsRunnable) {
  updateStatusMessage("Processing empty trigger to timeout state") // or 
whatever
  runBatch(sparkSessionForStream)
} else {
  updateStatusMessage("Waiting for data to arrive")
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185887294
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
+   * Attempts to construct the next batch based on whether new data is 
available and/or updated
+   * metadata is such that another batch needs to be run for state clean 
up / additional output
+   * generation even without new data. Returns true only if the next batch 
should be executed.
+   *
+   * Here is the high-level logic on how this constructs the next batch.
+   * - Check each source whether new data is available
+   * - Updated the query's metadata and check using the last execution 
whether there is any need
+   *   to run another batch (for state clean up, etc.)
+   * - If either of the above is true, then construct the next batch by 
committing to the offset
+   *   log that range of offsets that the next batch will process.
*/
-  private def constructNextBatch(): Unit = {
-// Check to see what new data is available.
-val hasNewData = {
-  awaitProgressLock.lock()
-  try {
-// Generate a map from each unique source to the next available 
offset.
-val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
-  case s: Source =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("getOffset") {
-  (s, s.getOffset)
-}
-  case s: MicroBatchReader =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("setOffsetRange") {
-  // Once v1 streaming source execution is gone, we can 
refactor this away.
-  // For now, we set the range here to get the source to infer 
the available end offset,
-  // get that offset, and then set the range again when we 
later execute.
-  s.setOffsetRange(
-toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
-Optional.empty())
-}
-
-val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
-(s, Option(currentOffset))
-}.toMap
-availableOffsets ++= latestOffsets.filter { case (_, o) => 
o.nonEmpty }.mapValues(_.get)
-
-if (dataAvailable) {
-  true
-} else {
-  noNewData = true
-  false
+  private def constructNextBatch(): Boolean = withProgressLocked {
+// If new data is already available that means this method has already 
been called before
+// and it must have already committed the offset range of next batch 
to the offset log.
+// Hence do nothing, just return true.
+if (isNewDataAvailable) return true
--- End diff --

I don't see anyone else calling this method


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185893229
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.SparkPlan
+
+class WatermarkTracker extends Logging {
+  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
+  private var watermarkMs: Long = 0
+  private var updated = false
+
+  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
+watermarkMs = newWatermarkMs
+  }
+
+  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
+val watermarkOperators = executedPlan.collect {
--- End diff --

I would comment on the contracts. We expect a certain ordering of stateful 
operators across triggers. therefore we turn off cbo, etc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185892675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -384,22 +363,21 @@ class MicroBatchExecution(
   commitLog.purge(currentBatchId - minLogEntriesToMaintain)
 }
   }
+  noNewData = false
 } else {
-  awaitProgressLock.lock()
-  try {
-// Wake up any threads that are waiting for the stream to progress.
-awaitProgressLockCondition.signalAll()
-  } finally {
-awaitProgressLock.unlock()
-  }
+  noNewData = true
+  awaitProgressLockCondition.signalAll()
 }
+shouldConstructNextBatch
   }
 
   /**
* Processes any data available between `availableOffsets` and 
`committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this 
batch with.
*/
   private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+logDebug(s"Running batch $currentBatchId")
+
--- End diff --

I guess we're going to see if all sources follow the contract of returning 
an empty dataframe if the start and end offsets are the same


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185896682
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
   check() // nothing emitted yet
 
   addTimestamp(104, 123) // watermark = 90 before this, watermark = 
123 - 10 = 113 after this
-  check() // nothing emitted yet
+  check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
--- End diff --

I would explicitly test with flag on for this. in case we want to turn it 
off, this test shouldn't fail


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185678739
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
+   * Attempts to construct the next batch based on whether new data is 
available and/or updated
--- End diff --

this paragraph is highly confusing. Could you please reword? Maybe 
something like:
```
Attempts to construct a batch according to:
 - Availability of new data
 - Existence of timeouts in stateful operators
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185890809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
+   * Attempts to construct the next batch based on whether new data is 
available and/or updated
+   * metadata is such that another batch needs to be run for state clean 
up / additional output
+   * generation even without new data. Returns true only if the next batch 
should be executed.
+   *
+   * Here is the high-level logic on how this constructs the next batch.
+   * - Check each source whether new data is available
+   * - Updated the query's metadata and check using the last execution 
whether there is any need
+   *   to run another batch (for state clean up, etc.)
+   * - If either of the above is true, then construct the next batch by 
committing to the offset
+   *   log that range of offsets that the next batch will process.
*/
-  private def constructNextBatch(): Unit = {
-// Check to see what new data is available.
-val hasNewData = {
-  awaitProgressLock.lock()
-  try {
-// Generate a map from each unique source to the next available 
offset.
-val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
-  case s: Source =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("getOffset") {
-  (s, s.getOffset)
-}
-  case s: MicroBatchReader =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("setOffsetRange") {
-  // Once v1 streaming source execution is gone, we can 
refactor this away.
-  // For now, we set the range here to get the source to infer 
the available end offset,
-  // get that offset, and then set the range again when we 
later execute.
-  s.setOffsetRange(
-toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
-Optional.empty())
-}
-
-val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
-(s, Option(currentOffset))
-}.toMap
-availableOffsets ++= latestOffsets.filter { case (_, o) => 
o.nonEmpty }.mapValues(_.get)
-
-if (dataAvailable) {
-  true
-} else {
-  noNewData = true
-  false
+  private def constructNextBatch(): Boolean = withProgressLocked {
+// If new data is already available that means this method has already 
been called before
+// and it must have already committed the offset range of next batch 
to the offset log.
+// Hence do nothing, just return true.
+if (isNewDataAvailable) return true
+
+// Generate a map from each unique source to the next available offset.
+val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
+  case s: Source =>
+updateStatusMessage(s"Getting offsets from $s")
+reportTimeTaken("getOffset") {
+  (s, s.getOffset)
 }
-  } finally {
-awaitProgressLock.unlock()
-  }
-}
-if (hasNewData) {
-  var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermarks if we find any in the plan.
-  if (lastExecution != null) {
-lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec => e
-}.zipWithIndex.foreach {
-  case (e, index) if e.eventTimeStats.value.count > 0 =>
-logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
-val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
-val prevWatermarkMs = watermarkMsMap.get(index)
-if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
-  watermarkMsMap.put(index, newWatermarkMs)
-}
-
-  // Populate 0 if we haven't seen any data yet for this watermark 
node.
-  case (_, index) =>
-if (!watermarkMsMap.isDefinedAt(index)) {
-  watermarkMsMap.put(index, 0)
-}
+  case s: MicroBatchReader =>
+updateSta

[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185893837
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.SparkPlan
+
+class WatermarkTracker extends Logging {
+  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
+  private var watermarkMs: Long = 0
+  private var updated = false
+
+  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
+watermarkMs = newWatermarkMs
+  }
+
+  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
+val watermarkOperators = executedPlan.collect {
+  case e: EventTimeWatermarkExec => e
+}
+if (watermarkOperators.isEmpty) return
+
+
+watermarkOperators.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
+logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
+val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+val prevWatermarkMs = operatorToWatermarkMap.get(index)
+if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
+  operatorToWatermarkMap.put(index, newWatermarkMs)
+}
+
+  // Populate 0 if we haven't seen any data yet for this watermark 
node.
+  case (_, index) =>
+if (!operatorToWatermarkMap.isDefinedAt(index)) {
+  operatorToWatermarkMap.put(index, 0)
+}
+}
+
+// Update the global watermark to the minimum of all watermark nodes.
+// This is the safest option, because only the global watermark is 
fault-tolerant. Making
+// it the minimum of all individual watermarks guarantees it will 
never advance past where
+// any individual watermark operator would be if it were in a plan by 
itself.
+val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
+if (newWatermarkMs > watermarkMs) {
+  logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+  watermarkMs = newWatermarkMs
+  updated = true
+} else {
+  logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
+  updated = false
+}
+  }
+
+  def watermarkUpdated: Boolean = synchronized { updated }
--- End diff --

is this used anywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185887201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
+   * Attempts to construct the next batch based on whether new data is 
available and/or updated
+   * metadata is such that another batch needs to be run for state clean 
up / additional output
+   * generation even without new data. Returns true only if the next batch 
should be executed.
+   *
+   * Here is the high-level logic on how this constructs the next batch.
+   * - Check each source whether new data is available
+   * - Updated the query's metadata and check using the last execution 
whether there is any need
+   *   to run another batch (for state clean up, etc.)
+   * - If either of the above is true, then construct the next batch by 
committing to the offset
+   *   log that range of offsets that the next batch will process.
*/
-  private def constructNextBatch(): Unit = {
-// Check to see what new data is available.
-val hasNewData = {
-  awaitProgressLock.lock()
-  try {
-// Generate a map from each unique source to the next available 
offset.
-val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
-  case s: Source =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("getOffset") {
-  (s, s.getOffset)
-}
-  case s: MicroBatchReader =>
-updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("setOffsetRange") {
-  // Once v1 streaming source execution is gone, we can 
refactor this away.
-  // For now, we set the range here to get the source to infer 
the available end offset,
-  // get that offset, and then set the range again when we 
later execute.
-  s.setOffsetRange(
-toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
-Optional.empty())
-}
-
-val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
-(s, Option(currentOffset))
-}.toMap
-availableOffsets ++= latestOffsets.filter { case (_, o) => 
o.nonEmpty }.mapValues(_.get)
-
-if (dataAvailable) {
-  true
-} else {
-  noNewData = true
-  false
+  private def constructNextBatch(): Boolean = withProgressLocked {
+// If new data is already available that means this method has already 
been called before
+// and it must have already committed the offset range of next batch 
to the offset log.
+// Hence do nothing, just return true.
+if (isNewDataAvailable) return true
--- End diff --

how is this possible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185891436
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -373,7 +352,7 @@ class MicroBatchExecution(
 reader.commit(reader.deserializeOffset(off.json))
 }
   } else {
-throw new IllegalStateException(s"batch $currentBatchId 
doesn't exist")
+throw new IllegalStateException(s"batch ${currentBatchId - 1} 
doesn't exist")
--- End diff --

good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185885198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -128,40 +130,49 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
   protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
-triggerExecutor.execute(() => {
-  startTrigger()
 
+triggerExecutor.execute(() => {
   if (isActive) {
+var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
+var currentBatchHadNewData = false // Whether the current batch 
had new data
+
 reportTimeTaken("triggerExecution") {
+  startTrigger()
+
+  // We'll do this initialization only once every start / restart
   if (currentBatchId < 0) {
-// We'll do this initialization only once
 populateStartOffsets(sparkSessionForStream)
-
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
+logInfo(s"Stream started from $committedOffsets")
   }
-  if (dataAvailable) {
-currentStatus = currentStatus.copy(isDataAvailable = true)
+
+  
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+  // Try to construct the next batch. This will return true only 
if the next batch is
+  // ready and runnable. Note that the current batch may be 
runnable even without
+  // new data to process as `constructNextBatch` may decide to run 
a batch for
+  // state cleanup, etc. `isNewDataAvailable` will be updated to 
reflect whether new data
+  // is available or not.
+  currentBatchIsRunnable = constructNextBatch()
+
+  currentStatus = currentStatus.copy(isDataAvailable = 
isNewDataAvailable)
--- End diff --

why not set `currentBatchHadNewData` here? It's not immediately clear to me 
if `isNewDataAvailable` can update between here and 3 lines below. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185677898
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -128,40 +130,49 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
   protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
-triggerExecutor.execute(() => {
-  startTrigger()
 
+triggerExecutor.execute(() => {
   if (isActive) {
+var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
+var currentBatchHadNewData = false // Whether the current batch 
had new data
+
 reportTimeTaken("triggerExecution") {
+  startTrigger()
--- End diff --

this used to be out of the timing block


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

2018-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21220#discussion_r185883611
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -128,40 +130,49 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
   protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
-triggerExecutor.execute(() => {
-  startTrigger()
 
+triggerExecutor.execute(() => {
   if (isActive) {
+var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
+var currentBatchHadNewData = false // Whether the current batch 
had new data
+
 reportTimeTaken("triggerExecution") {
+  startTrigger()
+
+  // We'll do this initialization only once every start / restart
   if (currentBatchId < 0) {
-// We'll do this initialization only once
 populateStartOffsets(sparkSessionForStream)
-
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
+logInfo(s"Stream started from $committedOffsets")
   }
-  if (dataAvailable) {
-currentStatus = currentStatus.copy(isDataAvailable = true)
+
+  
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+  // Try to construct the next batch. This will return true only 
if the next batch is
+  // ready and runnable. Note that the current batch may be 
runnable even without
+  // new data to process as `constructNextBatch` may decide to run 
a batch for
+  // state cleanup, etc. `isNewDataAvailable` will be updated to 
reflect whether new data
+  // is available or not.
+  currentBatchIsRunnable = constructNextBatch()
+
+  currentStatus = currentStatus.copy(isDataAvailable = 
isNewDataAvailable)
+  if (currentBatchIsRunnable) {
 updateStatusMessage("Processing new data")
+// Remember whether the current batch has data or not. This 
will be required later
--- End diff --

the status message above isn't completely truthful if we are running a 
zero-data batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183912999
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+import java.util.IdentityHashMap
+import scala.collection.JavaConverters._
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
+}
+
+if (onlyDataSourceV2Sources) {
+  // DataSourceV2ScanExec is the execution plan leaf that is 
responsible for reading data
+  // from a V2 source and has a direct reference to the V2 source that 
generated it. Each
+  // DataSourceV2ScanExec records the number of rows it has read using 
SQLMetrics. However,
+  // just collecting all DataSourceV2ScanExec nodes and getting the 
metric is not correct as
+  // a DataSourceV2ScanExec instance may be referred to in the 
execution plan from two (or
+  // even multiple times) points and considering it twice will leads 
to double counting. We
+  // can't dedup them using their hashcode either because two 
different instances of
+  // DataSourceV2ScanExec can have the same hashcode but account for 
separate sets of
+  // records read, and deduping them to consider only one of them 
would be undercounting the
+  // records read. Therefore the right way to do this is to consider 
the unique instances of
+  // DataSourceV2ScanExec (using their identity hash codes) and get 
metrics from them.
+  // Hence we calculate in the following way.
+  //
+  // 1. Collect all the unique DataSourceV2ScanExec instances using 
IdentityHashMap.
+  //
+  // 2. Extract the source and the number of rows read from the 
DataSourceV2ScanExec instanes.
+  //
+  // 3. Multiple Da

[GitHub] spark pull request #21134: [SPARK-24056] [SS] Make consumer creation lazy in...

2018-04-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21134#discussion_r183860834
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -53,7 +53,7 @@ private[kafka010] class KafkaOffsetReader(
*/
   val kafkaReaderThread = Executors.newSingleThreadExecutor(new 
ThreadFactory {
 override def newThread(r: Runnable): Thread = {
-  val t = new UninterruptibleThread("Kafka Offset Reader") {
+  val t = new UninterruptibleThread(s"Kafka Offset Reader") {
--- End diff --

uber nit: don't need `s`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542367
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

e.g. self-join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183533297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
--- End diff --

we don't have a way to track these for ContinuousProcessing at the moment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542307
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

what if you do a stream-stream join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542025
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -733,6 +804,11 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 }
   }
 
+  def getLastProgressWithData(q: StreamingQuery): 
Option[StreamingQueryProgress] = {
+q.recentProgress.filter(_.numInputRows > 0).lastOption
+  }
+
+
--- End diff --

nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183541978
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
--- End diff --

nit: unpersist later?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-23 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/21124
  
LGTM!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938118
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180935752
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
--- End diff --

whoa what does `FileSystem => _` do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938649
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
+
+  // List
+  val acceptAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = true
+  }
+  val rejectAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = false
+  }
+  assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName 
== "dir"))
+  assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+  // Create atomic without overwrite
+  var path = new Path(s"$dir/file")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).close()
+  assert(fm.exists(path))
+  intercept[IOException] {
+// should throw exception since file exists and overwrite is false
+fm.createAtomic(path, overwriteIfPossible = false).close()
+  }
+
+  // Create atomic with overwrite if possible
+  path = new Path(s"$dir/file2")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()
+  assert(fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()  // should 
not throw exception
+
+  // Open and delete
+  fm.open(path).close()
+  fm.delete(path)
+  assert(!fm.exists(path))
+  intercept[IOException] {
+fm.open(path)
+  }
+  fm.delete(path) // should not throw exception
+}
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with 
SharedSparkSession {
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+withSQLConf(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+classOf[TestCheckpointFileManager].getName) {
+  val fileManager =
+CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+  assert(fileManager.isInstanceOf[TestCheckpointFileManager])
+}
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+import FakeFileSystem.scheme
+spark.conf.set(
+  s"fs.$scheme.impl",
+  classOf[FakeFileSystem].getName)
+withTempDir { temp =&

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938989
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -471,6 +470,41 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("error writing [version].delta cancels the output stream") {
+
+val hadoopConf = new Configuration()
+hadoopConf.set(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+  classOf[TestCheckpointFileManager].getName)
+val remoteDir = Utils.createTempDir().getAbsolutePath
+
+val provider = newStoreProvider(
+  opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = 
hadoopConf)
+
+// Disable failure of output stream and generate versions
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+for (version <- 1 to 10) {
+  val store = provider.getStore(version - 1)
+  put(store, version.toString, version) // update "1" -> 1, "2" -> 2, 
...
+  store.commit()
+}
+val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
+
+val store = provider.getStore(10)
+// Fail commit for next version and verify that reloading resets the 
files
+TestCheckpointFileManager.shouldFailInCreateAtomic = true
+put(store, "11", 11)
+val e = intercept[IllegalStateException] { quietly { store.commit() } }
+assert(e.getCause.isInstanceOf[IOException], "Was waiting the 
IOException to be thrown")
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+
+// Abort commit for next version and verify that reloading resets the 
files
+val store2 = provider.getStore(10)
+put(store2, "11", 11)
+store2.abort()
+assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
--- End diff --

can you verify that it was false before the `abort`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180937241
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename

[GitHub] spark issue #20937: [SPARK-23723][SPARK-23724][SQL] Support custom encoding ...

2018-04-03 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20937
  
yes, please do


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20941: [SPARK-23827] [SS] StreamingJoinExec should ensur...

2018-03-29 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20941#discussion_r178218438
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -450,8 +450,8 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
 // This can often catch hard to debug errors when developing 
stateful operators
 val executedPlan = currentStream.lastExecution.executedPlan
 executedPlan.collect { case s: StatefulOperator => s }.foreach { s 
=>
-  assert(s.stateInfo.isDefined)
-  assert(s.stateInfo.get.numPartitions >= 1)
+  assert(
+
s.stateInfo.map(_.numPartitions).contains(currentStream.lastExecution.numStateStores))
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20941: [SPARK-23827] [SS] StreamingJoinExec should ensur...

2018-03-29 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20941#discussion_r178208005
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -444,6 +445,26 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
 }
   }
 
+  if (currentStream.isInstanceOf[MicroBatchExecution]) {
+// Verify if stateful operators have correct metadata and 
distribution
+// This can often catch hard to debug errors when developing 
stateful operators
+val executedPlan = currentStream.lastExecution.executedPlan
+executedPlan.collect { case s: StatefulOperator => s }.foreach { s 
=>
+  assert(s.stateInfo.isDefined)
+  assert(s.stateInfo.get.numPartitions >= 1)
+
+  s.requiredChildDistribution.foreach { d =>
+withClue(s"$s specifies incorrect # partitions in 
requiredChildDistribution $d") {
+  assert(d.requiredNumPartitions.isDefined)
+  assert(d.requiredNumPartitions.get >= 1)
+  if (d != AllTuples) {
+assert(d.requiredNumPartitions.get == 
s.stateInfo.get.numPartitions)
--- End diff --

can you also verify that this is equal to the number of partitions in the 
metadata?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-09 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173543942
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,99 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * This will make a best effort attempt to
--- End diff --

I would love to see the rest of this sentence. Such a cliffhanger!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...

2018-03-02 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20698
  
LGTM pending tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988510
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
+
+  def testWithMinPartitions(name: String, minPartition: Int)
+  (f: KafkaOffsetRangeCalculator => Unit): Unit = {
+val options = new DataSourceOptions(Map("minPartitions" -> 
minPartition.toString).asJava)
+test(s"with minPartition = $minPartition: $name") {
+  f(KafkaOffsetRangeCalculator(options))
+}
+  }
+
+
+  test("with no minPartition: N TopicPartitions to N offset ranges") {
+val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 2)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2),
+executorLocations = Seq("location")) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, Some("location"
+  }
+
+  test("with no minPartition: empty ranges ignored") {
+val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 1)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+  }
+
+  testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc 
=>
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) ==
+  Seq(
+KafkaOffsetRange(tp1, 1, 2, None),
+KafkaOffsetRange(tp2, 1, 2, None),
+KafkaOffsetRange(tp3, 1, 2, None)))
+  }
+
+  testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 5)) ==
+  Seq(
+KafkaOffsetRange(tp1, 1, 2, None),
+KafkaOffsetRange(tp1, 2, 3, None),
+KafkaOffsetRange(tp1, 3, 4, None),
+KafkaOffsetRange(tp1, 4, 5, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 5),
+executorLocations = Seq("location")) ==
+Seq(
+  KafkaOffsetRange(tp1, 1, 2, None),
+  KafkaOffsetRange(tp1, 2, 3, None),
+  KafkaOffsetRange(tp1, 3, 4, None),
+  KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set 
when minPartition is set
+  }
+
+  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) 
{ calc =>
--- End diff --

can you also add a test:
```
fromOffsets = Map(tp1 -> 1),
untilOffsets = M

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r17198
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
--- End diff --

nit: `map(_.size).sum`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171987888
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
--- End diff --

nit: `range.size`, you may remove `rangeSize` above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988062
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetR

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988112
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetR

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171983185
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+
--- End diff --

nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732779
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader(
 
   override def close(): Unit = {
 // Indicate that we're no longer using this consumer
--- End diff --

maybe remove this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733516
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
--- End diff --

yeah, a comment about how this is calculating the `weight` of partitions to 
assign to this topic would help. In addition, the sum of `parts` after this 
calculation will be `>= minPartitions`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733038
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
--- End diff --

I don't think we need the first check. `offsetRanges.size` should be 
greater than 0 right? Otherwise we wouldn't have called into this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732729
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
 }
 
 /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch 
streaming query. */
-private[kafka010] class KafkaMicroBatchDataReaderFactory(
-range: KafkaOffsetRange,
-preferredLoc: Option[String],
+private[kafka010] case class KafkaMicroBatchDataReaderFactory(
+offsetRange: KafkaOffsetRange,
 executorKafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+failOnDataLoss: Boolean,
+reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
 
-  override def preferredLocations(): Array[String] = preferredLoc.toArray
+  override def preferredLocations(): Array[String] = 
offsetRange.preferredLoc.toArray
 
   override def createDataReader(): DataReader[UnsafeRow] = new 
KafkaMicroBatchDataReader(
-range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer)
 }
 
 /** A [[DataReader]] for reading Kafka data in a micro-batch streaming 
query. */
-private[kafka010] class KafkaMicroBatchDataReader(
+private[kafka010] case class KafkaMicroBatchDataReader(
 offsetRange: KafkaOffsetRange,
 executorKafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
+failOnDataLoss: Boolean,
+reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with 
Logging {
+
+  private val consumer = {
+if (!reuseKafkaConsumer) {
+  // If we can't reuse CachedKafkaConsumers, creating a new 
CachedKafkaConsumer. As here we
--- End diff --

`nit: We use 'assign' here, hence don't need to ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733181
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
--- End diff --

was this check here before? What if there are new topic partitions? Are we 
missing those, because they may not exist in fromOffsets?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732183
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader(
 // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
 // Otherwise, interrupting a thread while running `KafkaConsumer.poll` 
may hang forever
 // (KAFKA-1894).
-assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
--- End diff --

Assertions can be turned off


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...

2018-02-26 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20673
  
@HyukjinKwon Added tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-26 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170646787
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,18 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+// Use piped streams to avoid extra memory consumption
+val outputStream = new PipedOutputStream()
+val inputStream = new PipedInputStream(outputStream)
+try {
+  mapper.writeValue(outputStream, event)
--- End diff --

I was actually hoping for a test to fail, but none did (the test suite has 
a bunch of very specific stuff). This code will likely block forever if the 
block size is larger. Going to add a test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-25 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/20673

[SPARK-23515] Use input/output streams for large events in 
JsonProtocol.sparkEventToJson

## What changes were proposed in this pull request?

`def sparkEventToJson(event: SparkListenerEvent)`

has a fallback method which creates a JSON object by turning an 
unrecognized event to Json and then parsing it again. This method materializes 
the whole string to parse the json record, which is unnecessary, and can cause 
OOMs as seen in the stack trace below:

```
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.(String.java:207)
at java.lang.StringBuilder.toString(StringBuilder.java:407)
at 
com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356)
at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35)
at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20)
at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50)
at 
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103)
```

We should just use the stream parsing to avoid such OOMs.

## How was this patch tested?




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark eventLoggingJson

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20673


commit 774188003c5b1c1a000d69f5996dce580c7a1432
Author: Burak Yavuz <brkyvz@...>
Date:   2018-02-25T20:07:22Z

use streams for large events




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20614: Revert [SPARK-23094] Fix invalid character handling in J...

2018-02-14 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20614
  
LGTM, my initial assumption that files had to be UTF-8 encoded was a wrong 
one :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 ...

2018-01-30 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20279
  
Closed in favor of #20445


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread brkyvz
Github user brkyvz closed the pull request at:

https://github.com/apache/spark/pull/20279


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20302: [SPARK-23094] Fix invalid character handling in JsonData...

2018-01-17 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/20302
  
cc @hvanhovell @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20302: [SPARK-23094] Fix invalid character handling in J...

2018-01-17 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/20302

[SPARK-23094] Fix invalid character handling in JsonDataSource

## What changes were proposed in this pull request?

There were two related fixes regarding `from_json`, `get_json_object` and 
`json_tuple` ([Fix 
#1](https://github.com/apache/spark/commit/c8803c06854683c8761fdb3c0e4c55d5a9e22a95),
 [Fix 
#2](https://github.com/apache/spark/commit/86174ea89b39a300caaba6baffac70f3dc702788)),
 but they weren't comprehensive it seems. I wanted to extend those fixes to all 
the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark json-invfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20302


commit 231152ab0d615166bdea354916a9f6ca0aaf7e6a
Author: Burak Yavuz <brkyvz@...>
Date:   2018-01-18T01:01:00Z

[ES-4104][WARMFIX] Fix invalid character handling in JsonDataSource

Cherry-pick of https://github.com/databricks/spark/pull/1135

## What changes were proposed in this pull request?

I shall also merge this upstream, but wanted to merge here first since it 
was an ES ticket related to Qubole workloads.

There were two related fixes regarding `from_json`, `get_json_object` and 
`json_tuple` in OSS ([Fix 
#1](https://github.com/apache/spark/commit/c8803c06854683c8761fdb3c0e4c55d5a9e22a95),
 [Fix 
#2](https://github.com/apache/spark/commit/86174ea89b39a300caaba6baffac70f3dc702788)),
 but they weren't comprehensive it seems. I wanted to extend those fixes to all 
the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

Author: Burak Yavuz 

Closes #1135 from brkyvz/json-32-dbx.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

## **IMPORTANT** Warmfix instructions

If this PR needs to be warmfixed (i.e. merged into the release branch after 
the code freeze), please follow steps below.

What type of warmfix is this? Please select **exactly one choice**, or 
write description in Other.

- [ ] Regression (e.g. fixing the behavior of a feature that regressed in 
the current release cycle)
- [ ] ES ticket fix (e.g. Customer or internally requested update/fix)
- [ ] Other (please describe):

Make the following updates to this PR:

- [ ] Add `[WARMFIX]` in the title of this PR.
- [ ] Label the PR using label(s) corrsponding to the WARMFIX branch(es). 
The label name should be in the format of `dbr-branch-a.b` (e.g. 
`dbr-branch-3.2`), which matches the release branch name for Runtime release 
`a.b`.
- [ ] Ask your team lead to sign off this warmfix and add the 
`warmfix-approved` label.
- [ ] When merging the PR using the merge script, make sure to get this PR 
merged into the following branches:

  - The branch against which your PR is opened, and
  - Any extra release branch(es) corresponding to the `dbr-branch-a.b` 
label(s) applied to your PR.

Author: Burak Yavuz <brk...@gmail.com>

Closes #1616 from brkyvz/inv-json4x.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20279: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-16 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/20279

[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs

## What changes were proposed in this pull request?

This PR migrates the MemoryStream to DataSourceV2 APIs.

## How was this patch tested?

All existing unit tests in streaming.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark memv2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20279


commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f
Author: Burak Yavuz <brkyvz@...>
Date:   2018-01-11T16:44:19Z

save for so far

commit 78c50f860aa13f569669f4ad77f4325d80085c8b
Author: Burak Yavuz <brkyvz@...>
Date:   2018-01-12T18:27:49Z

Save so far

commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c
Author: Burak Yavuz <brkyvz@...>
Date:   2018-01-13T01:43:03Z

save so far

commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0
Author: Burak Yavuz <brkyvz@...>
Date:   2018-01-16T19:14:08Z

Compiles and I think also runs correctly




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...

2017-12-25 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/18029
  
Merged to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627994
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
+  .initialPosition(initialPosition)
   .checkpointAppName(appName)
   .checkpointInterval(checkpointInterval)
   .storageLevel(storageLevel)
   .build();
 assert(kinesisDStream.streamName() == streamName);
 assert(kinesisDStream.endpointUrl() == endpointUrl);
 assert(kinesisDStream.regionName() == region);
-assert(kinesisDStream.initialPositionInStream() == initialPosition);
+assert(kinesisDStream.initialPosition().getPosition() == 
initialPosition.getPosition());
+assert(kinesisDStream.checkpointAppName() == appName);
+assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+assert(kinesisDStream._storageLevel() == storageLevel);
+ssc.stop();
+  }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApi() {
+String streamName = "a-very-nice-stream-name";
+String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;;
+String region = "us-west-2";
+String appName = "a-very-nice-kinesis-app";
+Duration checkpointInterval = Seconds.apply(30);
+StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+KinesisInputDStream<byte[]> kinesisDStream = 
KinesisInputDStream.builder()
+.streamingContext(ssc)
+.streamName(streamName)
+.endpointUrl(endpointUrl)
+.regionName(region)
+.initialPositionInStream(InitialPositionInStream.LATEST)
+.checkpointAppName(appName)
+.checkpointInterval(checkpointInterval)
+.storageLevel(storageLevel)
+.build();
+assert(kinesisDStream.streamName() == streamName);
+assert(kinesisDStream.endpointUrl() == endpointUrl);
+assert(kinesisDStream.regionName() == region);
+assert(kinesisDStream.initialPosition().getPosition() == 
InitialPositionInStream.LATEST);
 assert(kinesisDStream.checkpointAppName() == appName);
 assert(kinesisDStream.checkpointInterval() == checkpointInterval);
 assert(kinesisDStream._storageLevel() == storageLevel);
 ssc.stop();
   }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * Test old API doesn't support the InitialPositionInStream.AT_TIMESTAMP.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApiAtTimestamp() {
--- End diff --

This test could be moved to become a Scala test instead, using 
```scala
intercept[UnsupportedOperationException] {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627640
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -56,12 +57,13 @@ import org.apache.spark.util.Utils
  * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
  * @param regionName  Region name used by the Kinesis Client Library for
  *DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
- * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+ * @param initialPosition  Instance of [[KinesisInitialPosition]]
+ * In the absence of Kinesis checkpoint 
info, this is the
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627744
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * A java wrapper for exposing [[InitialPositionInStream]]
+ * to the corresponding Kinesis readers.
+ */
+interface KinesisInitialPosition {
+InitialPositionInStream getPosition();
+}
+
+public class KinesisInitialPositions {
+public static class Latest implements KinesisInitialPosition, 
Serializable {
+public Latest() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.LATEST;
+}
+}
+
+public static class TrimHorizon implements KinesisInitialPosition, 
Serializable {
+public TrimHorizon() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.TRIM_HORIZON;
+}
+}
+
+public static class AtTimestamp implements KinesisInitialPosition, 
Serializable {
+private Date timestamp;
+
+public AtTimestamp(Date timestamp) {
+this.timestamp = timestamp;
+}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.AT_TIMESTAMP;
+}
+
+public Date getTimestamp() {
+return timestamp;
+}
+}
+
+
+/**
+ * Returns instance of [[KinesisInitialPosition]] based on the passed 
[[InitialPositionInStream]].
+ * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+ * to InitialPosition. This function would be removed when we 
deprecate the KinesisUtils.
+ *
+ * @return [[InitialPosition]]
+ */
+public static KinesisInitialPosition fromKinesisInitialPosition(
+InitialPositionInStream initialPositionInStream) throws 
UnsupportedOperationException {
+if (initialPositionInStream == InitialPositionInStream.LATEST) {
+return new Latest();
+} else if (initialPositionInStream == 
InitialPositionInStream.TRIM_HORIZON) {
+return new TrimHorizon();
+} else {
+// InitialPositionInStream.AT_TIMESTAMP is not supported.
+// Use InitialPosition.atTimestamp(timestamp) instead.
+throw new UnsupportedOperationException(
+"Only InitialPositionInStream.LATEST and 
InitialPositionInStream.TRIM_HORIZON " +
+"supported in initialPositionInStream(). 
Please use the initialPosition() from " +
+"builder API in KinesisInputDStream for using 
InitialPositionInStream.AT_TIMESTAMP");
+}
+}
+}
--- End diff --

nit: new line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627941
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
+  .initialPosition(initialPosition)
   .checkpointAppName(appName)
   .checkpointInterval(checkpointInterval)
   .storageLevel(storageLevel)
   .build();
 assert(kinesisDStream.streamName() == streamName);
 assert(kinesisDStream.endpointUrl() == endpointUrl);
 assert(kinesisDStream.regionName() == region);
-assert(kinesisDStream.initialPositionInStream() == initialPosition);
+assert(kinesisDStream.initialPosition().getPosition() == 
initialPosition.getPosition());
+assert(kinesisDStream.checkpointAppName() == appName);
+assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+assert(kinesisDStream._storageLevel() == storageLevel);
+ssc.stop();
+  }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApi() {
+String streamName = "a-very-nice-stream-name";
+String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;;
+String region = "us-west-2";
+String appName = "a-very-nice-kinesis-app";
+Duration checkpointInterval = Seconds.apply(30);
+StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+KinesisInputDStream<byte[]> kinesisDStream = 
KinesisInputDStream.builder()
+.streamingContext(ssc)
--- End diff --

nit: indentation should be 2 spaces.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19975: [SPARK-22781][SS] Support creating streaming dataset wit...

2017-12-18 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/19975
  
This LGTM. @zsxwing Any other comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...

2017-12-18 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/18029
  
Making them as singletons is unnecessary. How about this:

```java
public interface InitialPosition {
public InitialPositionInStream toKinesis();
}

public class InitialPositions {
public static class Latest implements InitialPosition {
public Latest() {}

@Override
public InitialPositionInStream toKinesis() {
return InitialPositionInStream.LATEST;
}
}

public static class TrimHorizon implements InitialPosition {
public TrimHorizon() {}

@Override
public InitialPositionInStream toKinesis() {
return InitialPositionInStream.TRIM_HORIZON;
}
}

public static class AtTimestamp implements InitialPosition {
private Date timestamp;

public AtTimestamp(Date timestamp) {
this.timestamp = timestamp;
}

@Override
public InitialPositionInStream toKinesis() {
return InitialPositionInStream.AT_TIMESTAMP;
}

public Date getTimestamp() {
return timestamp;
}
}
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...

2017-12-14 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/18029
  
Actually yeah, I like your way.

On Dec 14, 2017 3:08 PM, "yashs360" <notificati...@github.com> wrote:

> *@yashs360* commented on this pull request.
> --
>
> In external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/
> InitialPosition.scala
> <https://github.com/apache/spark/pull/18029#discussion_r157086878>:
>
> > +import java.util.Date
> +
> +import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
> +
> +/**
> + * Trait for Kinesis's InitialPositionInStream.
> + * This will be overridden by more specific types.
> + */
> +sealed trait InitialPosition {
> +  val initialPositionInStream: InitialPositionInStream
> +}
> +
> +/**
> + * Case object for Kinesis's InitialPositionInStream.LATEST.
> + */
> +case object Latest extends InitialPosition {
>
> Hi @brkyvz <https://github.com/brkyvz> , Thanks for the review.
> Are you suggesting to put everything into a new object. And refer the case
> objects from the java class methods?
> In that case is it better to create the objects in Java and expose them
> directly, since we will have cases where we will need direct access to the
> case objects/classes (instead of the java methods) like one of the test
> cases:
> initialPosition.asInstanceOf[AtTimestamp].timestamp
>
> I would create a new branch with the changes and share with you if its
> fine ?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18029#discussion_r157086878>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AFACewoTV1GYt4dpddBP_Jsx7cF6AUVjks5tAaprgaJpZM4NfLn->
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r157044691
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.util.{Clock, Utils}
+
+class MicroBatchExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: Sink,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  private val triggerExecutor = trigger match {
+case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+case OneTimeTrigger => OneTimeExecutor()
+case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  }
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in QueryExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
+val _logicalPlan = analyzedPlan.transform {
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = 
s"$resolvedCheckpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)(sparkSession)
+})
+}
+sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
+uniqueSources = sources.distinct
+_logicalPlan
+  }
+
+  /**
+   * Repeatedly attempts to run batches as data arrives.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
+triggerExecutor.execute(() => {
+  startTrigger()
+
+  if (isActive) {
+reportTimeTaken("triggerExecution") {
+  if (currentBatchId < 0) {
+// We'll do this initialization only once
+populateStartOffsets(sparkSessionForStream)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+  } else {
+constructNextBatch()
+  }
+  if (dataAvailable) {
+currentStatus = currentStatus.copy(isDataAvailable = true)
+updateStatusMessage("Processing new data")

[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156266325
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
--- End diff --

please add `@InterfaceStability.Evolving` above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156265997
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 ---
@@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
   .build()
 assert(dstream.endpointUrl == customEndpointUrl)
 assert(dstream.regionName == customRegion)
-assert(dstream.initialPositionInStream == customInitialPosition)
+assert(dstream.initialPosition == customInitialPosition)
 assert(dstream.checkpointAppName == customAppName)
 assert(dstream.checkpointInterval == customCheckpointInterval)
 assert(dstream._storageLevel == customStorageLevel)
 assert(dstream.kinesisCreds == customKinesisCreds)
 assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
 assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+
+// Testing with AtTimestamp
+val cal = Calendar.getInstance()
+cal.add(Calendar.DATE, -1)
+val timestamp = cal.getTime()
+val initialPositionAtTimestamp = AtTimestamp(timestamp)
+
+val dstreamAtTimestamp = builder
+  .endpointUrl(customEndpointUrl)
+  .regionName(customRegion)
+  .initialPosition(initialPositionAtTimestamp)
+  .checkpointAppName(customAppName)
+  .checkpointInterval(customCheckpointInterval)
+  .storageLevel(customStorageLevel)
+  .kinesisCredentials(customKinesisCreds)
+  .dynamoDBCredentials(customDynamoDBCreds)
+  .cloudWatchCredentials(customCloudWatchCreds)
+  .build()
+assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
+assert(dstreamAtTimestamp.regionName == customRegion)
+assert(dstreamAtTimestamp.initialPosition.initialPositionInStream
+  == initialPositionAtTimestamp.initialPositionInStream)
+assert(
+  
dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp))
+assert(dstreamAtTimestamp.checkpointAppName == customAppName)
+assert(dstreamAtTimestamp.checkpointInterval == 
customCheckpointInterval)
+assert(dstreamAtTimestamp._storageLevel == customStorageLevel)
+assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
+assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
+assert(dstreamAtTimestamp.cloudWatchCreds == 
Option(customCloudWatchCreds))
+
+// Testing with AtTimestamp
+val initialPositionAtTimestamp2 = AtTimestamp(timestamp)
--- End diff --

how is the following lines a different test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156266783
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
--- End diff --

can you wrap all of these in an object?
```scala
sealed trait InitialPosition {
  ...
}

object internal {
  case object Latest extends InitialPosition {
  }
  ...
  case class AtTimestamp(timestamp: Date) extends InitialPosition {
  }
}
```
Note how InitialPosition is outside, and `internal` is lowercase.

so that people go only through the Java Interface 
(`org.apache.spark.streaming.kinesis.Latest()`) etc

Your documentation and test cases go through the Scala interface which 
makes it super weird to have 2 things corresponding to the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...

2017-12-08 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19925#discussion_r155834182
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to 
this data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+   *queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+   *can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *incrementing counter representing a consistent set of 
data; the same batch may
+   *be started multiple times in failure recovery 
scenarios, but it will always
+   *contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch 
output means to this
+   * source, please refer to {@link OutputMode} for more 
details.
--- End diff --

to this **sink**? not source


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19838: [SPARK-22638][SS]Use a separate query for StreamingQuery...

2017-11-29 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/19838
  
LGTM but I have very limited context on this codepath. Maybe @tdas can also 
take a very quick look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19838: [SPARK-22638][SS]Use a separate query for Streami...

2017-11-29 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19838#discussion_r153910748
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 ---
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
 
   import StreamingQueryListener._
 
-  sparkListenerBus.addToSharedQueue(this)
+  sparkListenerBus.addToQueue(this, "streams")
--- End diff --

nit: wanna make this a constant?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

2017-10-17 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/19495
  
LGTM!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145283034
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -1086,4 +1181,24 @@ object FlatMapGroupsWithStateSuite {
 override def metrics: StateStoreMetrics = new 
StateStoreMetrics(map.size, 0, Map.empty)
 override def hasCommitted: Boolean = true
   }
+
+  def assertCanGetProcessingTime(predicate: => Boolean): Unit = {
+if (!predicate) throw new TestFailedException("Could not get 
processing time", 20)
--- End diff --

what is the `20`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 ---
@@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
   val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
+  val watermarkPresent = child.output.exists {
--- End diff --

this is cleaner, but doesn't `eventTimeWatermark.isDefined` imply that the 
watermark is present?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (!watermarkPresent) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling 
setting watermark before " +
--- End diff --

`without enabling setting watermark` sounds too convoluted. You probably 
meant `without setting watermark`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flat

[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

2017-10-16 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/19495
  
LGTM. Just a bunch of cosmetic nits, but fine to address them separately


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >