[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219580690
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
 }
   }
 
+  test("Heartbeat should drop zero metrics") {
+heartbeatZeroMetricTest(true)
+  }
+
+  test("Heartbeat should not drop zero metrics when the conf is set to 
false") {
+heartbeatZeroMetricTest(false)
+  }
+
+  private def withHeartbeatExecutor(confs: (String, String)*)
+  (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
+val conf = new SparkConf
+confs.foreach { case (k, v) => conf.set(k, v) }
+val serializer = new JavaSerializer(conf)
+val env = createMockEnv(conf, serializer)
+val executor =
+  new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, 
isLocal = true)
+val executorClass = classOf[Executor]
+
+// Set ExecutorMetricType.values to be a minimal set to avoid get null 
exceptions
+val metricClass =
+  
Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName()
 + "$")
+val metricTypeValues = metricClass.getDeclaredField("values")
+metricTypeValues.setAccessible(true)
+metricTypeValues.set(
+  org.apache.spark.metrics.ExecutorMetricType,
+  IndexedSeq(JVMHeapMemory, JVMOffHeapMemory))
+
+// Save all heartbeats sent into an ArrayBuffer for verification
+val heartbeats = ArrayBuffer[Heartbeat]()
+val mockReceiver = mock[RpcEndpointRef]
+when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+  .thenAnswer(new Answer[HeartbeatResponse] {
+override def answer(invocation: InvocationOnMock): 
HeartbeatResponse = {
+  val args = invocation.getArguments()
+  val mock = invocation.getMock
+  heartbeats += args(0).asInstanceOf[Heartbeat]
+  HeartbeatResponse(false)
+}
+  })
+val receiverRef = 
executorClass.getDeclaredField("heartbeatReceiverRef")
+receiverRef.setAccessible(true)
+receiverRef.set(executor, mockReceiver)
+
+f(executor, heartbeats)
+  }
+
+  private def invokeReportHeartbeat(executor: Executor): Unit = {
--- End diff --

You can mixin `org.scalatest.PrivateMethodTester` to replace this method, 
such as
```
val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat)
...
 executor.invokePrivate(reportHeartBeat())
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219576946
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -83,6 +83,17 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
 
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS =
+
ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true)
--- End diff --

Also please call `internal()` to indicate that this is not a public config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219574155
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -149,7 +149,7 @@ private[spark] class Executor(
 
   // Executor for the heartbeat task.
   private val heartbeater = new Heartbeater(env.memoryManager, 
reportHeartBeat,
-"executor-heartbeater", 
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+"executor-heartbeater", 
conf.getTimeAsMs(EXECUTOR_HEARTBEAT_INTERVAL.key, "10s"))
--- End diff --

nit: `conf.get(EXECUTOR_HEARTBEAT_INTERVAL)`. Could you search the whole 
code base and update them as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219573967
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -120,7 +120,7 @@ private[spark] class Executor(
   }
 
   // Whether to load classes in user jars before those in Spark jars
-  private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
+  private val userClassPathFirst = 
conf.getBoolean(EXECUTOR_USER_CLASS_PATH_FIRST.key, false)
--- End diff --

nit: `conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-20 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22507

[SPARK-25495][SS]FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-kafka-reset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22507


commit 1c8ef21a0d68154d9229afa2b117d3f19688a1a6
Author: Shixiong Zhu 
Date:   2018-09-20T22:49:19Z

fix reset




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22478: [SPARK-25472] Don't have legitimate stops of streams cau...

2018-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22478
  
LGTM pending tests. Could you add `[SS]` to your title?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-19 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r218941326
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -799,7 +799,8 @@ private[spark] class Executor(
   if (taskRunner.task != null) {
 taskRunner.task.metrics.mergeShuffleReadMetrics()
 taskRunner.task.metrics.setJvmGCTime(curGCTime - 
taskRunner.startGCTime)
-accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators()))
+accumUpdates +=
+  ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators().filterNot(_.isZero)))
--- End diff --

Could you add a flag for this behavior change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...

2018-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22473
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22402: [SPARK-25414][SS][TEST] make it clear that the numRows m...

2018-09-17 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22402
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22402: [SPARK-25414][SS][TEST] make it clear that the numRows m...

2018-09-17 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22402
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-09-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21721
  
FYI, I submitted #22334 to revert #21819 and #21721.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22334: [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748

2018-09-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22334
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22334: [SPARK-25336][SS]Revert SPARK-24863 and SPARK 247...

2018-09-04 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22334

[SPARK-25336][SS]Revert SPARK-24863 and SPARK 24748

## What changes were proposed in this pull request?

Revert SPARK-24863 and SPARK 24748 as per discussion in #21721. We will 
revisit them when the data source v2 APIs are out.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark revert-SPARK-24863-SPARK-24748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22334


commit 555e5d7f4271b534cce542faa5e3065c765c78b4
Author: Shixiong Zhu 
Date:   2018-09-04T20:35:25Z

Revert "[SPARK-24863][SS] Report Kafka offset lag as a custom metrics"

This reverts commit 14d7c1c3e99e7523c757628d411525aa9d8e0709.

commit 3d59df12fa407e998bbca9f83dd374e552849da6
Author: Shixiong Zhu 
Date:   2018-09-04T21:24:57Z

Revert "[SPARK-24748][SS] Support for reporting custom metrics via 
StreamingQuery Progress"

This reverts commit 18b6ec14716bfafc25ae281b190547ea58b59af1.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...

2018-09-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22138
  
Thanks for your PR. This is really a big change. It will need very careful 
review as it changes a lot of critical code path and the current Kafka consumer 
logic is really complicated. Let's hold this before 2.4 branch gets cut as it's 
risky to put this into 2.4. It's basically blocked by a complicated correctness 
fix and should be fixed soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests

2018-08-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22293
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21721
  
@arunmahadevan yeah, it's better to figure out the solution for continuous 
mode as well. As you mentioned, the current SQL metrics are not updated unless 
the task completes, so we may need to add new APIs to support reporting metrics 
for continuous mode. It would be great that there will be a consistent API for 
all modes. Let's step back and think about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...

2018-08-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22293#discussion_r214209091
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -652,62 +654,67 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 }
 
+val topicPartition = new TopicPartition(topic, 0)
 // The message values are the same as their offsets to make the test 
easy to follow
 testUtils.withTranscationalProducer { producer =>
   testStream(mapped)(
 StartStream(ProcessingTime(100), clock),
 waitUntilBatchProcessed,
 CheckAnswer(),
-WithOffsetSync(topic) { () =>
+WithOffsetSync(topicPartition) { () =>
--- End diff --

Good suggestion!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21721
  
It’s better to not release such APIs without thinking about how to 
support continuous queries, since it may need to change APIs, which should be 
avoided if possible. I propose to revert this PR. It would be great that there 
is a design doc for streaming source metrics APIs to discuss how to support all 
modes before committing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22292: [SPARK-25286][CORE] Removing the dangerous parmap

2018-08-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22292#discussion_r214204549
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 ---
@@ -315,7 +315,9 @@ private[streaming] object FileBasedWriteAheadLog {
 implicit val ec = executionContext
--- End diff --

nit: this line is not needed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests

2018-08-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22293
  
now this passed 11 times on Jenkins


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...

2018-08-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22233#discussion_r214203842
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   protected override def generateTable(
   catalog: SessionCatalog,
   name: TableIdentifier,
-  isDataSource: Boolean): CatalogTable = {
+  isDataSource: Boolean,
+  partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
--- End diff --

Yeah, please don't overwrite a method with a default parameter. It's very 
easy to use different default values then the value to pick up will depend on 
the type you are using...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...

2018-08-30 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22293

[SPARK-25288][Tests]Fix flaky Kafka transaction tests

## What changes were proposed in this pull request?

Here are the failures:


http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short 
time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to 
make sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22293.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22293


commit 6439367e3dfd9612f30395ae445df67a87ede871
Author: Shixiong Zhu 
Date:   2018-08-30T21:44:42Z

Fix flaky Kafka transaction tests




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

2018-08-28 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22042
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...

2018-08-28 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22210
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...

2018-08-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22233#discussion_r213137139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -671,7 +674,7 @@ case class AlterTableRecoverPartitionsCommand(
 val value = ExternalCatalogUtils.unescapePathName(ps(1))
 if (resolver(columnName, partitionNames.head)) {
   scanPartitions(spark, fs, filter, st.getPath, spec ++ 
Map(partitionNames.head -> value),
-partitionNames.drop(1), threshold, resolver)
+partitionNames.drop(1), threshold, resolver, 
listFilesInParallel = false)
--- End diff --

@MaxGekk could you revert to use Scala `par`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...

2018-08-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22210
  
cc @brkyvz 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...

2018-08-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22233#discussion_r213063623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -671,7 +674,7 @@ case class AlterTableRecoverPartitionsCommand(
 val value = ExternalCatalogUtils.unescapePathName(ps(1))
 if (resolver(columnName, partitionNames.head)) {
   scanPartitions(spark, fs, filter, st.getPath, spec ++ 
Map(partitionNames.head -> value),
-partitionNames.drop(1), threshold, resolver)
+partitionNames.drop(1), threshold, resolver, 
listFilesInParallel = false)
--- End diff --

@MaxGekk can we remove this `parmap` overload? It's pretty easy to cause 
deadlock. The `parmap` overload without the `ec` parameter is fine since it 
doesn't need a user specified thread pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kaf...

2018-08-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22245
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kaf...

2018-08-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22245
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

2018-08-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22042
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

2018-08-25 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22042
  
> This patch fails Spark unit tests.

This is the flaky test I fixed in #22230

retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...

2018-08-25 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22230
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kaf...

2018-08-24 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22230

[SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 source may return 
duplicated records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

This is a follow up PR for #22207 to fix a potential flaky test. 
`processAllAvailable` doesn't work for continuous processing so we should not 
use it for a continuous query.

## How was this patch tested?

Jenkins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25214-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22230.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22230


commit a52425676ddcaa6a2737a95aedc80b4d8452023e
Author: Shixiong Zhu 
Date:   2018-08-24T21:42:11Z

don't use query.processAllAvailable for continuous processing




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...

2018-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22230
  
cc @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...

2018-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22207
  
I just realized the Kafka source v2 is not in 2.3 :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...

2018-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22207
  
Thanks! Merging to master and 2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212709927
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
   

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212707515
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
   

[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...

2018-08-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22210#discussion_r212443117
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -95,26 +95,24 @@ public ByteBuffer nioByteBuffer() throws IOException {
   @Override
   public InputStream createInputStream() throws IOException {
 FileInputStream is = null;
+boolean shouldClose = true;
 try {
   is = new FileInputStream(file);
   ByteStreams.skipFully(is, offset);
-  return new LimitedInputStream(is, length);
+  InputStream r = new LimitedInputStream(is, length);
+  shouldClose = false;
+  return r;
 } catch (IOException e) {
-  try {
-if (is != null) {
-  long size = file.length();
-  throw new IOException("Error in reading " + this + " (actual 
file length " + size + ")",
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...

2018-08-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22210#discussion_r212443039
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -77,16 +77,16 @@ public ByteBuffer nioByteBuffer() throws IOException {
 return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
   }
 } catch (IOException e) {
+  String errorMessage = "Error in reading " + this;
   try {
 if (channel != null) {
   long size = channel.size();
-  throw new IOException("Error in reading " + this + " (actual 
file length " + size + ")",
--- End diff --

This is just thrown and then ignored. I assigned it to `errorMessage` so 
that we can see it in the error.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...

2018-08-23 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22210

[SPARK-25218][Core]Fix potential resource leaks in TransportServer and 
SocketAuthHelper

## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all 
types of errors.

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25218

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22210


commit 6f2248d45716332ac78e44b1314011806f59deb8
Author: Shixiong Zhu 
Date:   2018-08-23T20:10:03Z

Fix potential resource leaks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212410113
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
   

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212409454
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -1187,134 +1185,3 @@ class KafkaSourceStressSuite extends 
KafkaSourceTest {
   iterations = 50)
   }
 }
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with 
SharedSQLContext {
--- End diff --

Moved to KafkaDontFailOnDataLossSuite.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212409340
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
 offsetRanges.zipWithIndex.map { case (o, i) => new 
KafkaSourceRDDPartition(i, o) }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --

These methods are never used as Dataset always uses this RDD: 
https://github.com/apache/spark/blob/2a0a8f753bbdc8c251f8e699c0808f35b94cfd20/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L113
 and `MapPartitionsRDD` just calls the default RDD implementation. In addition, 
they may return wrong answers when `failOnDataLoss=false`. Hence, I just 
removed them.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-23 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22207

[SPARK-25214][SS]Fix the issue that Kafka v2 source may return duplicated 
records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated 
records when `failOnDataLoss=false`.

This PR fixes the issue and also adds regression tests for all Kafka 
readers.

## How was this patch tested?

New tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25214

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22207


commit f2d4d67c765a298d23964b26ec07596839f008fa
Author: Shixiong Zhu 
Date:   2018-08-23T17:46:52Z

Fix the issue that Kafka v2 source may return duplicated records when  is




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22181: [SPARK-25163][SQL] Fix flaky test: o.a.s.util.collection...

2018-08-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22181
  
LGTM. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...

2018-08-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22176
  
@markhamstra That's a good point. However,  since this is just following 
our current codes if you check the usages of `newDaemonCachedThreadPool`, and 
the changes here should be safe considering how we use them, I don't want to 
block this PR for this reason. We can open a new ticket to add configurations 
if that's necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...

2018-08-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22176
  
LGTM. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212033844
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -337,6 +338,7 @@ private[kafka010] case class 
KafkaMicroBatchInputPartitionReader(
   val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
   if (record != null) {
 nextRow = converter.toUnsafeRow(record)
+nextOffset = record.offset + 1
--- End diff --

 We should update `nextOffset` to `record.offset + 1` rather that 
`nextOffset + 1`. Otherwise, it may return duplicated records when 
`failOnDataLoss` is `false`. I will submit another PR to push this fix to 2.3 
as it's a correctness issue.

In addition, we should change `nextOffset` in the `next` method as the 
`get` method is designed to be called multiple times.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212032759
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages and abort the transaction. They should not be 
read.
+  producer.beginTransaction()
+  (6 to 10).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.abortTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages again. The consumer should skip the above 
aborted messages and read
+  // them.
+  producer.beginTransaction()
+  (11 to 15).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 t

[GitHub] spark issue #22182: [SPARK-25184][SS] Fixed race condition in StreamExecutio...

2018-08-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22182
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22182: [SPARK-25184][SS] Fixed race condition in StreamExecutio...

2018-08-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22182
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211786471
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +294,42 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
+  poll(offset, pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
--- End diff --

this is the place preventing me from making `offsetAfterPoll` be a local 
var.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211786183
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,22 +31,21 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
-/**
- * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
- * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
- */
-private[kafka010] class MissingOffsetException(
-val offset: Long,
-val nextOffsetToFetch: Long) extends Exception(
-  s"Offset $offset is missing. The next offset to fetch is: 
$nextOffsetToFetch")
-
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is `false`, it will
--- End diff --

Good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211786163
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,22 +31,21 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
-/**
- * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
- * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
- */
-private[kafka010] class MissingOffsetException(
-val offset: Long,
-val nextOffsetToFetch: Long) extends Exception(
-  s"Offset $offset is missing. The next offset to fetch is: 
$nextOffsetToFetch")
-
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is `false`, it will
+   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `true`, this
--- End diff --

> Will we throw an exception even when its a control message and there is 
no real data loss?

No. `It will be skipped and this method will try to fetch next available 
record within [offset, untilOffset).`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...

2018-08-17 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22106
  
Thanks. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r211035290
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends 
Logging {
 
   /** setup the whole embedded servers, including Zookeeper and Kafka 
brokers */
   def setup(): Unit = {
+// Set up a KafkaTestUtils leak detector so that we can see where the 
leak KafkaTestUtils is
+// created.
+val exception = new SparkException("It was created at: ")
+leakDetector = ShutdownHookManager.addShutdownHook { () =>
+  logError("Found a leak KafkaTestUtils.", exception)
+}
+
 setupEmbeddedZookeeper()
 setupEmbeddedKafkaServer()
   }
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-// There is a race condition that may kill JVM when terminating the 
Kafka cluster. We set
-// a custom Procedure here during the termination in order to keep JVM 
running and not fail the
-// tests.
-val logExitEvent = new Exit.Procedure {
-  override def execute(statusCode: Int, message: String): Unit = {
-logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
-  }
+if (leakDetector != null) {
+  ShutdownHookManager.removeShutdownHook(leakDetector)
 }
-Exit.setExitProcedure(logExitEvent)
-Exit.setHaltProcedure(logExitEvent)
-try {
-  brokerReady = false
-  zkReady = false
-
-  if (producer != null) {
-producer.close()
-producer = null
-  }
+brokerReady = false
--- End diff --

No. We set up in `beforeAll` and clean up in `afterAll`, which will be in 
the same thread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r210977997
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 ---
@@ -33,8 +33,12 @@ private[kafka010] object CachedKafkaProducer extends 
Logging {
 
   private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
 
+  private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
+
   private lazy val cacheExpireTimeout: Long =
-SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", 
"10m")
+Option(SparkEnv.get).map(_.conf.getTimeAsMs(
--- End diff --

Change this to call `clear` in `afterAll` even if the SparkContext has been 
stopped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...

2018-08-17 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22106
  
cc @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...

2018-08-17 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22106
  
Test failures in 4264 and 4266 are unrelated. The latest changes passed on 
Jenkins 15 times.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r210387003
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 ---
@@ -40,12 +40,7 @@ class KafkaContinuousSinkSuite extends 
KafkaContinuousTest {
 
   override val streamingTimeout = 30.seconds
 
-  override def beforeAll(): Unit = {
-super.beforeAll()
-testUtils = new KafkaTestUtils(
-  withBrokerProps = Map("auto.create.topics.enable" -> "false"))
-testUtils.setup()
-  }
+  override val brokerProps = Map("auto.create.topics.enable" -> "false")
--- End diff --

This is the fix for Kafka cluster leak


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...

2018-08-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22106#discussion_r210383608
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -216,7 +216,7 @@ class KafkaContinuousInputPartitionReader(
   } catch {
 // We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
 // swallow and ignore this.
-case _: TimeoutException =>
+case _: TimeoutException | _: 
org.apache.kafka.common.errors.TimeoutException =>
--- End diff --

This is to fix 
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4254/

`org.apache.kafka.common.errors.TimeoutException: Timeout of 3000ms expired 
before the position for partition failOnDataLoss-2-0 could be determined` 
triggered a task retry but as continuous processing doesn't support task 
retries, it failed with 
`org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException:
 Continuous execution does not support task retry`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...

2018-08-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22105#discussion_r210134581
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 ---
@@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, 
WritableByteChannel target) throws IOExcept
 // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
 // for the case that the passed-in buffer has too many components.
 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
--- End diff --

@vanzin The ByteBuffer here may be just a large ByteBuffer. See my comment 
here: https://github.com/apache/spark/pull/12083#issuecomment-204499691


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22105: [SPARK-25115] [Core] Eliminate extra memory copy done wh...

2018-08-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22105
  
@normanmaurer LGTM. Thanks for the fix. I totally forgot this issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...

2018-08-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22105#discussion_r210133826
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 ---
@@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, 
WritableByteChannel target) throws IOExcept
 // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
 // for the case that the passed-in buffer has too many components.
 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
--- End diff --

@vanzin This is to avoid memory copy when writing a large ByteBuffer. You 
merged this actually: #12083


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22106: [SPARK-25116][Tests]Fix the kafka cluster leak an...

2018-08-14 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22106

[SPARK-25116][Tests]Fix the kafka cluster leak and clean up cached producers

## What changes were proposed in this pull request?

KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest 
and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts 
down one cluster. This leaks a Kafka cluster and causes that some Kafka thread 
crash and kill JVM when SBT is trying to clean up tests.

This PR fixes the leak and also adds a shut down hook to detect Kafka 
cluster leak.

In additions, it also fixes `AdminClient` leak and cleans up cached 
producers to eliminate the following annoying logs:
```
8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN 
NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could 
not be established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN 
NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could 
not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN 
NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could 
not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN 
NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not 
be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN 
NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not 
be established. Broker may not be available.
```

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25116

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22106


commit 6a0ec9c53e2949de724c3596171b59aae94523c1
Author: Shixiong Zhu 
Date:   2018-08-14T17:59:48Z

fix kafka cluster leak

commit 59d10e2dc82bdbec1b0da19aaf3eff060db82804
Author: Shixiong Zhu 
Date:   2018-08-14T18:00:35Z

Revert "don't kill JVM during termination"

This reverts commit b5eb54244ed573c8046f5abf7bf087f5f08dba58.

commit e13b21a3d16c22ba069ef67f9a263af18f238691
Author: Shixiong Zhu 
Date:   2018-08-14T18:00:42Z

Merge branch 'master' into SPARK-25116

commit b561528196a4a6d3d9e0bb951358fc5f288fdb3d
Author: Shixiong Zhu 
Date:   2018-08-14T18:03:07Z

update

commit 574f5fa3fc6e7313774b1408eceb4543d4422ba0
Author: Shixiong Zhu 
Date:   2018-08-14T18:11:34Z

update




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...

2018-08-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22097
  
I'm going to merge this now since it does fix some issues. I will continue 
to investigate `exit cod 1` issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...

2018-08-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22097
  
I set a custom Exit.Procedure to prevent from killing JVM. Hope this will 
make the test more stable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...

2018-08-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22097
  
Looks like there is a race condition during terminating Kafka cluster:
{code}
18/08/13 15:34:44.148 kafka-log-cleaner-thread-0 ERROR LogCleaner: Failed 
to access checkpoint file cleaner-offset-checkpoint in dir 
/home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2
org.apache.kafka.common.errors.KafkaStorageException: Error while reading 
checkpoint file 
/home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2/cleaner-offset-checkpoint
Caused by: java.io.FileNotFoundException: 
/home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2/cleaner-offset-checkpoint
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87)
at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86)
at 
kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61)
at 
kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1$$anonfun$apply$1.apply(LogCleanerManager.scala:89)
at 
kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1$$anonfun$apply$1.apply(LogCleanerManager.scala:87)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1.apply(LogCleanerManager.scala:87)
at 
kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1.apply(LogCleanerManager.scala:95)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:86)
at 
kafka.log.LogCleanerManager$$anonfun$grabFilthiestCompactedLog$1.apply(LogCleanerManager.scala:126)
at 
kafka.log.LogCleanerManager$$anonfun$grabFilthiestCompactedLog$1.apply(LogCleanerManager.scala:123)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:123)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:296)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
{code}


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21698
  
> "you can at least sort the serialized bytes of T"

I think this should work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...

2018-08-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22097
  
cc @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zo...

2018-08-13 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22097

[SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper picking up an ipv6 
address

## What changes were proposed in this pull request?

I'm still seeing the Kafka tests failed randomly due to 
`kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for 
connection while in state: CONNECTING`. I checked the test output and saw 
zookeeper picked up an ipv6 address.

This PR just uses `127.0.0.1` rather than `localhost` to make sure 
zookeeper will never use an ipv6 address. 

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-zookeeper-connect

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22097


commit 90f55dc3a24e39a327754e6eb2b4a8ecfafcd096
Author: Shixiong Zhu 
Date:   2018-08-13T21:57:23Z

use 127.0.0.1 to avoid zookeeper picking up an ipv6 address




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22072: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-13 Thread zsxwing
Github user zsxwing closed the pull request at:

https://github.com/apache/spark/pull/22072


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22072: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22072
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21746: [SPARK-24699] [SS]Make watermarks work with Trigger.Once...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21746
  
@c-horn it's in 2.4.0. I just fixed the ticket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21980: [SPARK-25010][SQL] Rand/Randn should produce different v...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21980
  
LGTM2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18143
  
@ScrapCodes sorry for the delay. I think @tdas has fixed the issue. Please 
close the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r209371636
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

> Do you mean it's a one-writer, multi-reader scene?

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22072: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22072

[SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access 
released memory page (branch-2.2)

## What changes were proposed in this pull request?

Backport https://github.com/apache/spark/pull/22062 to branch-2.2.

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25081-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22072


commit 1a6452ef0939c09c09801cff78b0214d7979bf6d
Author: Shixiong Zhu 
Date:   2018-08-10T17:53:44Z

Nested spill in ShuffleExternalSorter should not access released memory page

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

The new unit test will make JVM crash without the fix.

Closes #22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22062
  
I also merged to branch-2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22062
  
Thanks. Merging to master. I will try to merge to old branches and report 
back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209337943
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

> We can also create a TaskContextImpl by hand right?

I can. Just to save several lines :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209338026
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
+val taskMetrics = new TaskMetrics
+when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+val sorter = new ShuffleExternalSorter(
+  taskMemoryManager,
+  sc.env.blockManager,
+  taskContext,
+  100, // initialSize - This will require ShuffleInMemorySorter to 
acquire at least 800 bytes
+  1, // numPartitions
+  conf,
+  new ShuffleWriteMetrics)
+val inMemSorter = {
+  val field = sorter.getClass.getDeclaredField("inMemSorter")
+  field.setAccessible(true)
+  field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+}
+// Allocate memory to make the next "insertRecord" call triggers a 
spill.
+val bytes = new Array[Byte](1)
+while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --

> Access to the hasSpaceForAnotherRecord is the only reason why we need 
reflection right?

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209337484
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below 
`allocateArray` will be no-op.
+pos = 0;
--- End diff --

We also need to set `usableCapacity` to `0`. Otherwise, 
https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343
 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep 
running and finally touch `array`.

Setting `array` to `null` is just for safety so that anything incorrect use 
will fail with NPE. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21698
  
> IIUC streaming query always need to specify a checkpoint location?

You can use a batch query to read and write Kafka :) My point is if the 
input and output data sources are not distributed file system, the user doesn't 
need to specify a file system location to checkpoint. In addition, if the user 
doesn't specify a checkpoint path, which path should we use?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

2018-08-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22062
  
cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-09 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22062

[SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access 
released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). 

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22062.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22062


commit 54799cae8ef0727988bbb863d326ea61b4d9ae72
Author: Shixiong Zhu 
Date:   2018-08-10T00:02:33Z

Nested spill in ShuffleExternalSorter should not access released memory page




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21698
  
> I also like ideas based on checkpointing

What if the user does't provide a distributed file system path? E.g., you 
can read from Kafka and write them back to Kafka and such workloads don't need 
a distributed file system in standalone mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r208750925
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -198,11 +198,14 @@ class SourceProgress protected[sql](
  * during a trigger. See [[StreamingQueryProgress]] for more information.
  *
  * @param description Description of the source corresponding to this 
status.
+ * @param numOutputRows Number of rows written to the sink or -1 for 
Continuous Mode (temporarily)
+ * or Sink V1 (until decommissioned).
  * @since 2.1.0
  */
 @InterfaceStability.Evolving
 class SinkProgress protected[sql](
-val description: String) extends Serializable {
+  val description: String,
+  val numOutputRows: Long) extends Serializable {
--- End diff --

I feel `numOutputRows` is a bit confusing. It sounds like how many rows 
outputted by the Sink. But the real meaning is how many rows written to the 
sink. How about `numReceivedRows`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r208749439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -58,6 +61,7 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceWriter, query: SparkPlan) e
 val useCommitCoordinator = writer.useCommitCoordinator
 val rdd = query.execute()
 val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+val totalNumRowsAccumulator = new LongAccumulator()
--- End diff --

You should call `SparkContext.longAccumulator` to create an accumulator. 
Why not use a SQLMetric? If so, it will show in the SQL UI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r208751032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -213,6 +216,12 @@ class SinkProgress protected[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-("description" -> JString(description))
+("description" -> JString(description)) ~
+  ("numOutputRows" -> JInt(numOutputRows))
   }
 }
+
+object SinkProgress {
+  def apply(description: String, numOutputRows: Option[Long]): 
SinkProgress =
+  new SinkProgress(description, numOutputRows.getOrElse(-1L))
--- End diff --

nit: please use 2-spaces.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

2018-08-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22042
  
cc @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-08 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22042

[SPARK-25005][SS]Support non-consecutive offsets for Kafka

## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka 
will be non-consecutive. It will contains some transaction (commit or abort) 
markers. In addition, if the consumer's `isolation.level` is `read_committed`, 
`poll` will not return aborted messages either. Hence, we will see 
non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` 
may move the offset point to these missing offsets, there are 4 possible corner 
cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark kafka-transaction-read

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22042


commit dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd
Author: Shixiong Zhu 
Date:   2018-08-08T17:40:37Z

Support non-consecutive offsets for Kafka




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r208676022
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
 offsetRanges.zipWithIndex.map { case (o, i) => new 
KafkaSourceRDDPartition(i, o) }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --

The assumption in these methods is no longer right, so remove them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-06 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21222
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-06 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21222
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-08-06 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21222
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21995: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...

2018-08-03 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21995
  
LGTM Merging to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r207662087
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriterCommitProgress.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+import java.io.Serializable;
+
+/**
+ * Sink progress information collected from {@link WriterCommitMessage}.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriterCommitProgress extends Serializable {
--- End diff --

Why this is a public API? This is just created and consumed inside Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21854: [SPARK-24896][SQL] Uuid should produce different values ...

2018-08-02 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/21854
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >