[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20549
  
**[Test build #4089 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4089/testReport)**
 for PR 20549 at commit 
[`d7144f6`](https://github.com/apache/spark/commit/d7144f63a99e575d5c996fd7919bdbe44266620f).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20549
  
**[Test build #4089 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4089/testReport)**
 for PR 20549 at commit 
[`d7144f6`](https://github.com/apache/spark/commit/d7144f63a99e575d5c996fd7919bdbe44266620f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...

2018-02-08 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/20549
  
I can run the tests for you, but, not sure this would be merged even if it 
passes. (You won't have permissions to let the tests run or whitelist yourself)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20490
  
**[Test build #87244 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87244/testReport)**
 for PR 20490 at commit 
[`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20490
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/737/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20490
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20490
  
just type "retest this please"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20490
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20541: [SPARK-23356][SQL]Pushes Project to both sides of Union ...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20541
  
I'm confused about why we need `PushProjectionThroughUnion`. Generally we 
only need to push down required columns, not entire project list, as there is 
no benifit of doing this. I think we just need to handle `Union` in the 
`ColumnPruning` rule, but I may miss something. cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20555
  
**[Test build #87243 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87243/testReport)**
 for PR 20555 at commit 
[`b26ffce`](https://github.com/apache/spark/commit/b26ffce6780078dbc38bff658e1ef7e9c56c3dd8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-08 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20555
  
cc @kiszk @sitalkedia @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20555
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20555
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/736/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-08 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20555

[SPARK-23366] Improve hot reading path in ReadAheadInputStream

## What changes were proposed in this pull request?

`ReadAheadInputStream` was introduced in 
https://github.com/apache/spark/pull/18317/ to optimize reading spill files 
from disk.
However, from the profiles it seems that the hot path of reading small 
amounts of data (like readInt) is inefficient - it involves taking locks, and 
multiple checks.

Optimize locking: Lock is not needed when simply accessing the active 
buffer. Only lock when needing to swap buffers or trigger async reading, or get 
information about the async state.

Optimize short-path single byte reads, that are used e.g. by Java library 
DataInputStream.readInt.

The asyncReader used to call "read" only once on the underlying stream, 
that never filled the underlying buffer when it was wrapping an 
LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger 
the async reader to be triggered to fill the read ahead buffer on each call, 
because the reader would see that the active buffer is below the refill 
threshold all the time.

However, filling the full buffer all the time could introduce increased 
latency, so also add an `AtomicBoolean` flag for the async reader to return 
earlier if there is a reader waiting for data.

Remove `readAheadThresholdInBytes` and instead immediately trigger async 
read when switching the buffers. It allows to simplify code paths, especially 
the hot one that then only has to check if there is available data in the 
active buffer, without worrying if it needs to retrigger async read. It seems 
to have positive effect on perf.

## How was this patch tested?

It was noticed as a regression in some workloads after upgrading to Spark 
2.3. 

It was particularly visible on TPCDS Q95 running on instances with fast 
disk (i3 AWS instances).
Running with profiling:
* Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read
* Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read
* Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - 
very slightly slower, practically within noise.

We didn't see other regressions, and many workloads in general seem to be 
faster with Spark 2.3 (not investigated if thanks to async readed, or 
unrelated).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20555.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20555


commit 987f15ccb01b6c0351fbfdd49d6930b929c50a74
Author: Juliusz Sompolski 
Date:   2018-01-30T20:54:47Z

locking tweak

commit b26ffce6780078dbc38bff658e1ef7e9c56c3dd8
Author: Juliusz Sompolski 
Date:   2018-02-01T14:27:09Z

fill the read ahead buffer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...

2018-02-08 Thread heary-cao
Github user heary-cao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20516#discussion_r167134346
  
--- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala ---
@@ -59,6 +59,7 @@ abstract class SparkFunSuite
   protected val enableAutoThreadAudit = true
 
   protected override def beforeAll(): Unit = {
+System.setProperty("spark.testing", "true")
--- End diff --

If this parameter is not set, sys.props.contains ("spark.testing") is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20477
  
**[Test build #87242 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87242/testReport)**
 for PR 20477 at commit 
[`0cc0600`](https://github.com/apache/spark/commit/0cc0600b8f6f3a46189ae38850835f34b57bd945).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20537: [SPARK-23314][PYTHON] Add ambiguous=False when lo...

2018-02-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20537#discussion_r167133597
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1744,8 +1744,27 @@ def _check_series_convert_timestamps_internal(s, 
timezone):
 from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
+# tz_localize with ambiguous=False has the same behavior of 
pytz.localize
+# >>> import datetime
+# >>> import pandas as pd
+# >>> import pytz
+# >>>
+# >>> t = datetime.datetime(2015, 11, 1, 1, 23, 24)
+# >>> ts = pd.Series([t])
+# >>> tz = pytz.timezone('America/New_York')
+# >>>
+# >>> ts.dt.tz_localize(tz, ambiguous=False)
+# 0   2015-11-01 01:23:24-05:00
+# dtype: datetime64[ns, America/New_York]
+# >>>
+# >>> ts.dt.tz_localize(tz, ambiguous=True)
+# 0   2015-11-01 01:23:24-04:00
+# dtype: datetime64[ns, America/New_York]
+# >>>
+# >>> str(tz.localize(t))
+# '2015-11-01 01:23:24-05:00'
--- End diff --

@icexelloss, I got that it's good to know but shall we describe it as a 
prose? This comment looks a format of a doctest but they are actually just in 
comments.

It would be nicer if we just have a explanation in the comments, not as a 
doctest format.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20477
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/735/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20477
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20516#discussion_r167133408
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -,7 +,7 @@ private[spark] object Utils extends Logging {
*/
   def portMaxRetries(conf: SparkConf): Int = {
 val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
-if (conf.contains("spark.testing")) {
+if (isTesting || conf.contains("spark.testing")) {
--- End diff --

shall we just call `isTesting` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20516#discussion_r167133433
  
--- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala ---
@@ -59,6 +59,7 @@ abstract class SparkFunSuite
   protected val enableAutoThreadAudit = true
 
   protected override def beforeAll(): Unit = {
+System.setProperty("spark.testing", "true")
--- End diff --

why we need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20516: [SPARK-23343][CORE][TEST] Increase the exception test fo...

2018-02-08 Thread heary-cao
Github user heary-cao commented on the issue:

https://github.com/apache/spark/pull/20516
  
@cloud-fan thank you for suggest.
`./project/SparkBuild.scala:795: javaOptions in Test += 
"-Dspark.testing=1"`  seems only the compiler of the spark effectively,
No effect on the SparkFunSuite unit test.  I update this PR to provides a 
solution to fix it.
Can you help me to review it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-02-08 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/20552
  
It's my intent to say that other data sources built by general developers 
aren't supposed to use batch ids in the executors for any purpose. In addition 
to the issue you mentioned, I don't think there's a compelling reason to do so 
in the DataSourceV2 model, and I worry it's easy to write implementations that 
seem correct but aren't that way.

Since this interface is still evolving, I think it makes sense to revisit 
the question if we notice a scenario where it's infeasible to rewrite a piece 
of transactional logic to not use the batch ID in the executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20552
  
**[Test build #87241 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport)**
 for PR 20552 at commit 
[`a33a35c`](https://github.com/apache/spark/commit/a33a35ccbae7350519a3faf8d5d3d6f35692feb3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167126862
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   query.stop()
 }
   }
+
--- End diff --

Good instinct, it didn't quite work. Added the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167126838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
+extends DataWriter[InternalRow] {
+  private val initialEpochId: Long = {
+// Start with the microbatch ID. If it's not there, we're in 
continuous execution,
+// so get the start epoch.
+// This ID will be incremented as commits happen.
+TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) 
match {
+  case null => 
TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+  case batch => batch.toLong
+}
+  

[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...

2018-02-08 Thread tovbinm
Github user tovbinm commented on the issue:

https://github.com/apache/spark/pull/17982
  
2.12 would be even better. I haven't seen a ticket related to 2.12 upgrade. 
I can give it a try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...

2018-02-08 Thread sandecho
Github user sandecho commented on the issue:

https://github.com/apache/spark/pull/20549
  
Srowen: Will the result of the test not be posted?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20554
  
@jose-torres  @zsxwing please take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20545
  
**[Test build #87240 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87240/testReport)**
 for PR 20545 at commit 
[`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/734/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20545
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87235/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...

2018-02-08 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/17982
  
Unless it changed the internals of the interpreter to afford a different 
place to hack in the init, don't think it is any different. However the build 
and most tests already work with 2.12.d


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20545
  
**[Test build #87235 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87235/testReport)**
 for PR 20545 at commit 
[`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124768
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
-"deserialization of initial offset with Spark 2.1.0") {
-withTempDir { metadataPath =>
-  val topic = newTopic
-  testUtils.createTopic(topic, partitions = 3)
-
-  val provider = new KafkaSourceProvider
-  val parameters = Map(
-"kafka.bootstrap.servers" -> testUtils.brokerAddress,
-"subscribe" -> topic
-  )
-  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
-"", parameters)
-  source.getOffset.get // Write initial offset
-
-  // Make sure Spark 2.1.0 will throw an exception when reading the 
new log
-  intercept[java.lang.IllegalArgumentException] {
-// Simulate how Spark 2.1.0 reads the log
-Utils.tryWithResource(new 
FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
-  val length = in.read()
-  val bytes = new Array[Byte](length)
-  in.read(bytes)
-  KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
-}
-  }
-}
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written 
by Spark 2.1.0") {
+  test("deserialization of initial offset written by Spark 2.1.0") {
 withTempDir { metadataPath =>
--- End diff --

Changed the two tests below to not use the source/reader directly (too 
low-level implementation dependent test) to actually run a streaming query 
using sample initial offset files in the `test/resources`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124564
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
--- End diff --

I think this test is superfluous and does not test anything useful. As with 
the other modified tests, "simulating" an implementation is a BAD test, and in 
this particular case it is attempting to simulate the 2.1.0 log, which is not 
necessary any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124346
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
 ---
@@ -0,0 +1,2 @@
+0v9
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
--- End diff --

note: should remove the newline to keep it consistent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124308
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --

note: should remove the newline to keep it consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...

2018-02-08 Thread tovbinm
Github user tovbinm commented on the issue:

https://github.com/apache/spark/pull/17982
  
@som-snytt @srowen what about `2.11.12`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123917
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123837
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123713
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123614
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123580
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123513
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20554
  
**[Test build #87239 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)**
 for PR 20554 at commit 
[`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123199
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -408,8 +401,27 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
   val TOPIC_OPTION_KEY = "topic"
 
+  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
--- End diff --

Moved this from KafkaSource to this class because this is used by multiple 
reader classes and therefore should be present in the higher level class (e.g. 
the provider class).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20554
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/733/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20554
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20554
  
**[Test build #87238 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)**
 for PR 20554 at commit 
[`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20554
  
Build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20554
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/732/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20554

[SPARK-23362][SS] Migrate Kafka Microbatch source to v2

## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with 
data source v2).

## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20554


commit 3ed2a509276194214875f39e1e18d8093155c54c
Author: Tathagata Das 
Date:   2018-02-09T01:46:56Z

Migrated




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

2018-02-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19775
  
My original intention is to expose MetricsSystem related interface in 
#11994 , so that users can leverage such interface to build their own metrics 
sink/source out of Spark. Unfortunately I'm stuck on the #11994 , but still I 
think it is better to leave this as a package out of Spark, pulling to much 
dependencies for non-core functionalities seems not so reasonable (just my 
thoughts).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20541: [SPARK-23356][SQL]Pushes Project to both sides of Union ...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20541
  
**[Test build #87237 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87237/testReport)**
 for PR 20541 at commit 
[`4f5d46b`](https://github.com/apache/spark/commit/4f5d46baca612caaa882cbabb3b35665e9c7ed8b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167120763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
--- End diff --

actually.. probably should not inline this. its outer closure may not be 
serializable in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167120724
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
--- End diff --

actually.. probably should not inline this. its outer closure may not be 
serializable in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20303
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/731/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20303
  
**[Test build #87236 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87236/testReport)**
 for PR 20303 at commit 
[`603c6d5`](https://github.com/apache/spark/commit/603c6d58ae9a72f8202236682c78cd48a9bb320e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20303
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20545
  
Yup, looks both tests are flaky :-/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20546
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87232/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20546
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20546
  
**[Test build #87232 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87232/testReport)**
 for PR 20546 at commit 
[`8544380`](https://github.com/apache/spark/commit/8544380e91f5bfa7c95cf613d49bc9144fedee9f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20541: [SPARK-23356][SQL]Pushes Project to both sides of...

2018-02-08 Thread heary-cao
Github user heary-cao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20541#discussion_r167113352
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -400,13 +400,24 @@ object PushProjectionThroughUnion extends 
Rule[LogicalPlan] with PredicateHelper
 // Push down deterministic projection through UNION ALL
 case p @ Project(projectList, Union(children)) =>
   assert(children.nonEmpty)
-  if (projectList.forall(_.deterministic)) {
-val newFirstChild = Project(projectList, children.head)
+  val (deterministicList, nonDeterministic) = 
projectList.partition(_.deterministic)
+
+  if (deterministicList.nonEmpty) {
+val newFirstChild = Project(deterministicList, children.head)
 val newOtherChildren = children.tail.map { child =>
   val rewrites = buildRewrites(children.head, child)
-  Project(projectList.map(pushToRight(_, rewrites)), child)
+  Project(deterministicList.map(pushToRight(_, rewrites)), child)
--- End diff --

if we push a + 1 to union, just a + 1.
seems this lack of test cases. let's add this test cases. thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20545
  
Seems I saw the same test failure at other PRs too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-02-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20382
  
Hi @tdas , would you please help to review again, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20545
  
**[Test build #87235 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87235/testReport)**
 for PR 20545 at commit 
[`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/730/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20545
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20537: [SPARK-23314][PYTHON] Add ambiguous=False when localizin...

2018-02-08 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20537
  
Great thanks
The fix is actually just two lines. LGTM

@hyukjinkwon could you help merge this ASAP to 2.3?





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20545
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87233/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20545
  
**[Test build #87233 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87233/testReport)**
 for PR 20545 at commit 
[`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18982
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18982
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87228/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20552
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87231/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18982
  
**[Test build #87228 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87228/testReport)**
 for PR 18982 at commit 
[`339c793`](https://github.com/apache/spark/commit/339c793451adafed64e57924f670254d669d82a8).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20552
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20552
  
**[Test build #87231 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87231/testReport)**
 for PR 20552 at commit 
[`87d0bc8`](https://github.com/apache/spark/commit/87d0bc8ce23ab5a95ba0b5432d6b58042b32bdac).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167081693
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   query.stop()
 }
   }
+
--- End diff --

I think there should be a test with continuous processing + foreach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167077542
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
--- End diff --

nit: This is really a small class. Maybe inline this rather than define a 
confusing name`...InternalWriter`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167076621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
--- End diff --

nit: params on different lines


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167078037
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
--- End diff --

similarly ... maybe inline this class as well. its very small.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167080181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
+extends DataWriter[InternalRow] {
+  private val initialEpochId: Long = {
+// Start with the microbatch ID. If it's not there, we're in 
continuous execution,
+// so get the start epoch.
+// This ID will be incremented as commits happen.
+TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) 
match {
+  case null => 
TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+  case batch => batch.toLong
+}
+  }

[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167080661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
--- End diff --

params in separate lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167078671
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
--- End diff --

add docs describing the implementation of this DataWriter, especially the 
lifecycle of ForeachWriter (should go here than inline comments).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167095485
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
--- End diff --

>but it's generally a good idea to call setReuseAddress(true).

Do I want to do this since I am asking for some available port rather than 
binding to a known port?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20553
  
Kubernetes integration test status success
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/724/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20553
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20553
  
**[Test build #87234 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87234/testReport)**
 for PR 20553 at commit 
[`50ebb50`](https://github.com/apache/spark/commit/50ebb5068a35a9a0f2becd27153bdc7cc7aae251).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20553
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87234/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-08 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20490
  
The test failure doesn't look related to these changes to me. How can I get 
on the list to ask jenkins to retest a PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20553
  
Kubernetes integration test starting
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/724/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20553
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/729/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20553
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >