[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219580690 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero metrics") { +heartbeatZeroMetricTest(true) + } + + test("Heartbeat should not drop zero metrics when the conf is set to false") { +heartbeatZeroMetricTest(false) + } + + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { +val conf = new SparkConf +confs.foreach { case (k, v) => conf.set(k, v) } +val serializer = new JavaSerializer(conf) +val env = createMockEnv(conf, serializer) +val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) +val executorClass = classOf[Executor] + +// Set ExecutorMetricType.values to be a minimal set to avoid get null exceptions +val metricClass = + Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName() + "$") +val metricTypeValues = metricClass.getDeclaredField("values") +metricTypeValues.setAccessible(true) +metricTypeValues.set( + org.apache.spark.metrics.ExecutorMetricType, + IndexedSeq(JVMHeapMemory, JVMOffHeapMemory)) + +// Save all heartbeats sent into an ArrayBuffer for verification +val heartbeats = ArrayBuffer[Heartbeat]() +val mockReceiver = mock[RpcEndpointRef] +when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { +override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) +} + }) +val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") +receiverRef.setAccessible(true) +receiverRef.set(executor, mockReceiver) + +f(executor, heartbeats) + } + + private def invokeReportHeartbeat(executor: Executor): Unit = { --- End diff -- You can mixin `org.scalatest.PrivateMethodTester` to replace this method, such as ``` val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat) ... executor.invokePrivate(reportHeartBeat()) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219576946 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -83,6 +83,17 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = + ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) --- End diff -- Also please call `internal()` to indicate that this is not a public config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219574155 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -149,7 +149,7 @@ private[spark] class Executor( // Executor for the heartbeat task. private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, -"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) +"executor-heartbeater", conf.getTimeAsMs(EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) --- End diff -- nit: `conf.get(EXECUTOR_HEARTBEAT_INTERVAL)`. Could you search the whole code base and update them as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219573967 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -120,7 +120,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + private val userClassPathFirst = conf.getBoolean(EXECUTOR_USER_CLASS_PATH_FIRST.key, false) --- End diff -- nit: `conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22507 [SPARK-25495][SS]FetchedData.reset should reset all fields ## What changes were proposed in this pull request? `FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results. ## How was this patch tested? The new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-kafka-reset Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22507.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22507 commit 1c8ef21a0d68154d9229afa2b117d3f19688a1a6 Author: Shixiong Zhu Date: 2018-09-20T22:49:19Z fix reset --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22478: [SPARK-25472] Don't have legitimate stops of streams cau...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22478 LGTM pending tests. Could you add `[SS]` to your title? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r218941326 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -799,7 +799,8 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) -accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) +accumUpdates += + ((taskRunner.taskId, taskRunner.task.metrics.accumulators().filterNot(_.isZero))) --- End diff -- Could you add a flag for this behavior change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22473 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22402: [SPARK-25414][SS][TEST] make it clear that the numRows m...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22402 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22402: [SPARK-25414][SS][TEST] make it clear that the numRows m...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22402 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21721 FYI, I submitted #22334 to revert #21819 and #21721. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22334: [SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22334 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22334: [SPARK-25336][SS]Revert SPARK-24863 and SPARK 247...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22334 [SPARK-25336][SS]Revert SPARK-24863 and SPARK 24748 ## What changes were proposed in this pull request? Revert SPARK-24863 and SPARK 24748 as per discussion in #21721. We will revisit them when the data source v2 APIs are out. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark revert-SPARK-24863-SPARK-24748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22334.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22334 commit 555e5d7f4271b534cce542faa5e3065c765c78b4 Author: Shixiong Zhu Date: 2018-09-04T20:35:25Z Revert "[SPARK-24863][SS] Report Kafka offset lag as a custom metrics" This reverts commit 14d7c1c3e99e7523c757628d411525aa9d8e0709. commit 3d59df12fa407e998bbca9f83dd374e552849da6 Author: Shixiong Zhu Date: 2018-09-04T21:24:57Z Revert "[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress" This reverts commit 18b6ec14716bfafc25ae281b190547ea58b59af1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22138 Thanks for your PR. This is really a big change. It will need very careful review as it changes a lot of critical code path and the current Kafka consumer logic is really complicated. Let's hold this before 2.4 branch gets cut as it's risky to put this into 2.4. It's basically blocked by a complicated correctness fix and should be fixed soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22293 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21721 @arunmahadevan yeah, it's better to figure out the solution for continuous mode as well. As you mentioned, the current SQL metrics are not updated unless the task completes, so we may need to add new APIs to support reporting metrics for continuous mode. It would be great that there will be a consistent API for all modes. Let's step back and think about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22293#discussion_r214209091 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -652,62 +654,67 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } +val topicPartition = new TopicPartition(topic, 0) // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), -WithOffsetSync(topic) { () => +WithOffsetSync(topicPartition) { () => --- End diff -- Good suggestion! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21721 Itâs better to not release such APIs without thinking about how to support continuous queries, since it may need to change APIs, which should be avoided if possible. I propose to revert this PR. It would be great that there is a design doc for streaming source metrics APIs to discuss how to support all modes before committing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22292: [SPARK-25286][CORE] Removing the dangerous parmap
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22292#discussion_r214204549 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala --- @@ -315,7 +315,9 @@ private[streaming] object FileBasedWriteAheadLog { implicit val ec = executionContext --- End diff -- nit: this line is not needed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22293 now this passed 11 times on Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22233#discussion_r214203842 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA protected override def generateTable( catalog: SessionCatalog, name: TableIdentifier, - isDataSource: Boolean): CatalogTable = { + isDataSource: Boolean, + partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = { --- End diff -- Yeah, please don't overwrite a method with a default parameter. It's very easy to use different default values then the value to pick up will depend on the type you are using... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22293 [SPARK-25288][Tests]Fix flaky Kafka transaction tests ## What changes were proposed in this pull request? Here are the failures: http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22293.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22293 commit 6439367e3dfd9612f30395ae445df67a87ede871 Author: Shixiong Zhu Date: 2018-08-30T21:44:42Z Fix flaky Kafka transaction tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22042 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22210 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22233#discussion_r213137139 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -671,7 +674,7 @@ case class AlterTableRecoverPartitionsCommand( val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), -partitionNames.drop(1), threshold, resolver) +partitionNames.drop(1), threshold, resolver, listFilesInParallel = false) --- End diff -- @MaxGekk could you revert to use Scala `par`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22210: [SPARK-25218][Core]Fix potential resource leaks in Trans...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22210 cc @brkyvz --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22233: [SPARK-25240][SQL] Fix for a deadlock in RECOVER ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22233#discussion_r213063623 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -671,7 +674,7 @@ case class AlterTableRecoverPartitionsCommand( val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), -partitionNames.drop(1), threshold, resolver) +partitionNames.drop(1), threshold, resolver, listFilesInParallel = false) --- End diff -- @MaxGekk can we remove this `parmap` overload? It's pretty easy to cause deadlock. The `parmap` overload without the `ec` parameter is fine since it doesn't need a user specified thread pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22245 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22245: [SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22245 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22042 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22042 > This patch fails Spark unit tests. This is the flaky test I fixed in #22230 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22230 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kaf...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22230 [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25214-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22230 commit a52425676ddcaa6a2737a95aedc80b4d8452023e Author: Shixiong Zhu Date: 2018-08-24T21:42:11Z don't use query.processAllAvailable for continuous processing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22230 cc @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22207 I just realized the Kafka source v2 is not in 2.3 :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 source may ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22207 Thanks! Merging to master and 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212709927 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212707515 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22210#discussion_r212443117 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -95,26 +95,24 @@ public ByteBuffer nioByteBuffer() throws IOException { @Override public InputStream createInputStream() throws IOException { FileInputStream is = null; +boolean shouldClose = true; try { is = new FileInputStream(file); ByteStreams.skipFully(is, offset); - return new LimitedInputStream(is, length); + InputStream r = new LimitedInputStream(is, length); + shouldClose = false; + return r; } catch (IOException e) { - try { -if (is != null) { - long size = file.length(); - throw new IOException("Error in reading " + this + " (actual file length " + size + ")", --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22210#discussion_r212443039 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -77,16 +77,16 @@ public ByteBuffer nioByteBuffer() throws IOException { return channel.map(FileChannel.MapMode.READ_ONLY, offset, length); } } catch (IOException e) { + String errorMessage = "Error in reading " + this; try { if (channel != null) { long size = channel.size(); - throw new IOException("Error in reading " + this + " (actual file length " + size + ")", --- End diff -- This is just thrown and then ignored. I assigned it to `errorMessage` so that we can see it in the error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22210: [SPARK-25218][Core]Fix potential resource leaks i...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22210 [SPARK-25218][Core]Fix potential resource leaks in TransportServer and SocketAuthHelper ## What changes were proposed in this pull request? Make sure TransportServer and SocketAuthHelper close the resources for all types of errors. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25218 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22210 commit 6f2248d45716332ac78e44b1314011806f59deb8 Author: Shixiong Zhu Date: 2018-08-23T20:10:03Z Fix potential resource leaks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212410113 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( +testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212409454 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -1187,134 +1185,3 @@ class KafkaSourceStressSuite extends KafkaSourceTest { iterations = 50) } } - -class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext { --- End diff -- Moved to KafkaDontFailOnDataLossSuite.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212409340 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum --- End diff -- These methods are never used as Dataset always uses this RDD: https://github.com/apache/spark/blob/2a0a8f753bbdc8c251f8e699c0808f35b94cfd20/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L113 and `MapPartitionsRDD` just calls the default RDD implementation. In addition, they may return wrong answers when `failOnDataLoss=false`. Hence, I just removed them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22207 [SPARK-25214][SS]Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false`. This PR fixes the issue and also adds regression tests for all Kafka readers. ## How was this patch tested? New tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25214 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22207 commit f2d4d67c765a298d23964b26ec07596839f008fa Author: Shixiong Zhu Date: 2018-08-23T17:46:52Z Fix the issue that Kafka v2 source may return duplicated records when is --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22181: [SPARK-25163][SQL] Fix flaky test: o.a.s.util.collection...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22181 LGTM. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22176 @markhamstra That's a good point. However, since this is just following our current codes if you check the usages of `newDaemonCachedThreadPool`, and the changes here should be safe considering how we use them, I don't want to block this PR for this reason. We can open a new ticket to add configurations if that's necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22176 LGTM. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212033844 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { nextRow = converter.toUnsafeRow(record) +nextOffset = record.offset + 1 --- End diff -- We should update `nextOffset` to `record.offset + 1` rather that `nextOffset + 1`. Otherwise, it may return duplicated records when `failOnDataLoss` is `false`. I will submit another PR to push this fix to 2.3 as it's a correctness issue. In addition, we should change `nextOffset` in the `next` method as the `get` method is designed to be called multiple times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212032759 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 t
[GitHub] spark issue #22182: [SPARK-25184][SS] Fixed race condition in StreamExecutio...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22182 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22182: [SPARK-25184][SS] Fixed race condition in StreamExecutio...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22182 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211786471 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +294,42 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) + poll(offset, pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { --- End diff -- this is the place preventing me from making `offsetAfterPoll` be a local var. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211786183 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread -/** - * An exception to indicate there is a missing offset in the records returned by Kafka consumer. - * This means it's either a transaction (commit or abort) marker, or an aborted message if - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. - */ -private[kafka010] class MissingOffsetException( -val offset: Long, -val nextOffsetToFetch: Long) extends Exception( - s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") - private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will --- End diff -- Good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211786163 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread -/** - * An exception to indicate there is a missing offset in the records returned by Kafka consumer. - * This means it's either a transaction (commit or abort) marker, or an aborted message if - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. - */ -private[kafka010] class MissingOffsetException( -val offset: Long, -val nextOffsetToFetch: Long) extends Exception( - s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") - private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this --- End diff -- > Will we throw an exception even when its a control message and there is no real data loss? No. `It will be skipped and this method will try to fetch next available record within [offset, untilOffset).` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22106 Thanks. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r211035290 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends Logging { /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { +// Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is +// created. +val exception = new SparkException("It was created at: ") +leakDetector = ShutdownHookManager.addShutdownHook { () => + logError("Found a leak KafkaTestUtils.", exception) +} + setupEmbeddedZookeeper() setupEmbeddedKafkaServer() } /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { -// There is a race condition that may kill JVM when terminating the Kafka cluster. We set -// a custom Procedure here during the termination in order to keep JVM running and not fail the -// tests. -val logExitEvent = new Exit.Procedure { - override def execute(statusCode: Int, message: String): Unit = { -logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") - } +if (leakDetector != null) { + ShutdownHookManager.removeShutdownHook(leakDetector) } -Exit.setExitProcedure(logExitEvent) -Exit.setHaltProcedure(logExitEvent) -try { - brokerReady = false - zkReady = false - - if (producer != null) { -producer.close() -producer = null - } +brokerReady = false --- End diff -- No. We set up in `beforeAll` and clean up in `afterAll`, which will be in the same thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r210977997 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala --- @@ -33,8 +33,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) + private lazy val cacheExpireTimeout: Long = -SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m") +Option(SparkEnv.get).map(_.conf.getTimeAsMs( --- End diff -- Change this to call `clear` in `afterAll` even if the SparkContext has been stopped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22106 cc @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak and clean...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22106 Test failures in 4264 and 4266 are unrelated. The latest changes passed on Jenkins 15 times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r210387003 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala --- @@ -40,12 +40,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { override val streamingTimeout = 30.seconds - override def beforeAll(): Unit = { -super.beforeAll() -testUtils = new KafkaTestUtils( - withBrokerProps = Map("auto.create.topics.enable" -> "false")) -testUtils.setup() - } + override val brokerProps = Map("auto.create.topics.enable" -> "false") --- End diff -- This is the fix for Kafka cluster leak --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][TESTS]Fix the Kafka cluster leak an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r210383608 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -216,7 +216,7 @@ class KafkaContinuousInputPartitionReader( } catch { // We didn't read within the timeout. We're supposed to block indefinitely for new data, so // swallow and ignore this. -case _: TimeoutException => +case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => --- End diff -- This is to fix https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4254/ `org.apache.kafka.common.errors.TimeoutException: Timeout of 3000ms expired before the position for partition failOnDataLoss-2-0 could be determined` triggered a task retry but as continuous processing doesn't support task retries, it failed with `org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22105#discussion_r210134581 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java --- @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); --- End diff -- @vanzin The ByteBuffer here may be just a large ByteBuffer. See my comment here: https://github.com/apache/spark/pull/12083#issuecomment-204499691 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22105: [SPARK-25115] [Core] Eliminate extra memory copy done wh...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22105 @normanmaurer LGTM. Thanks for the fix. I totally forgot this issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22105#discussion_r210133826 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java --- @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); --- End diff -- @vanzin This is to avoid memory copy when writing a large ByteBuffer. You merged this actually: #12083 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22106: [SPARK-25116][Tests]Fix the kafka cluster leak an...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22106 [SPARK-25116][Tests]Fix the kafka cluster leak and clean up cached producers ## What changes were proposed in this pull request? KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests. This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak. In additions, it also fixes `AdminClient` leak and cleans up cached producers to eliminate the following annoying logs: ``` 8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available. ``` ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25116 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22106 commit 6a0ec9c53e2949de724c3596171b59aae94523c1 Author: Shixiong Zhu Date: 2018-08-14T17:59:48Z fix kafka cluster leak commit 59d10e2dc82bdbec1b0da19aaf3eff060db82804 Author: Shixiong Zhu Date: 2018-08-14T18:00:35Z Revert "don't kill JVM during termination" This reverts commit b5eb54244ed573c8046f5abf7bf087f5f08dba58. commit e13b21a3d16c22ba069ef67f9a263af18f238691 Author: Shixiong Zhu Date: 2018-08-14T18:00:42Z Merge branch 'master' into SPARK-25116 commit b561528196a4a6d3d9e0bb951358fc5f288fdb3d Author: Shixiong Zhu Date: 2018-08-14T18:03:07Z update commit 574f5fa3fc6e7313774b1408eceb4543d4422ba0 Author: Shixiong Zhu Date: 2018-08-14T18:11:34Z update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22097 I'm going to merge this now since it does fix some issues. I will continue to investigate `exit cod 1` issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22097 I set a custom Exit.Procedure to prevent from killing JVM. Hope this will make the test more stable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22097 Looks like there is a race condition during terminating Kafka cluster: {code} 18/08/13 15:34:44.148 kafka-log-cleaner-thread-0 ERROR LogCleaner: Failed to access checkpoint file cleaner-offset-checkpoint in dir /home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2 org.apache.kafka.common.errors.KafkaStorageException: Error while reading checkpoint file /home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2/cleaner-offset-checkpoint Caused by: java.io.FileNotFoundException: /home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-5ad98c9e-0d75-4f23-a948-9e29246651d2/cleaner-offset-checkpoint (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) at kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1$$anonfun$apply$1.apply(LogCleanerManager.scala:89) at kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1$$anonfun$apply$1.apply(LogCleanerManager.scala:87) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1.apply(LogCleanerManager.scala:87) at kafka.log.LogCleanerManager$$anonfun$allCleanerCheckpoints$1.apply(LogCleanerManager.scala:95) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:86) at kafka.log.LogCleanerManager$$anonfun$grabFilthiestCompactedLog$1.apply(LogCleanerManager.scala:126) at kafka.log.LogCleanerManager$$anonfun$grabFilthiestCompactedLog$1.apply(LogCleanerManager.scala:123) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:123) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:296) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:289) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) {code} --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21698 > "you can at least sort the serialized bytes of T" I think this should work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22097 cc @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22097: [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zo...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22097 [SPARK-18057][FOLLOW-UP]Use 127.0.0.1 to avoid zookeeper picking up an ipv6 address ## What changes were proposed in this pull request? I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-zookeeper-connect Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22097.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22097 commit 90f55dc3a24e39a327754e6eb2b4a8ecfafcd096 Author: Shixiong Zhu Date: 2018-08-13T21:57:23Z use 127.0.0.1 to avoid zookeeper picking up an ipv6 address --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22072: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing closed the pull request at: https://github.com/apache/spark/pull/22072 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22072: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22072 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21746: [SPARK-24699] [SS]Make watermarks work with Trigger.Once...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21746 @c-horn it's in 2.4.0. I just fixed the ticket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21980: [SPARK-25010][SQL] Rand/Randn should produce different v...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21980 LGTM2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18143 @ScrapCodes sorry for the delay. I think @tdas has fixed the issue. Please close the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r209371636 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- > Do you mean it's a one-writer, multi-reader scene? Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22072: [SPARK-25081][Core]Nested spill in ShuffleExterna...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22072 [SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access released memory page (branch-2.2) ## What changes were proposed in this pull request? Backport https://github.com/apache/spark/pull/22062 to branch-2.2. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25081-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22072.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22072 commit 1a6452ef0939c09c09801cff78b0214d7979bf6d Author: Shixiong Zhu Date: 2018-08-10T17:53:44Z Nested spill in ShuffleExternalSorter should not access released memory page This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue. The new unit test will make JVM crash without the fix. Closes #22062 from zsxwing/SPARK-25081. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22062 I also merged to branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22062 Thanks. Merging to master. I will try to merge to old branches and report back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209337943 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- > We can also create a TaskContextImpl by hand right? I can. Just to save several lines :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209338026 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] +val taskMetrics = new TaskMetrics +when(taskContext.taskMetrics()).thenReturn(taskMetrics) +val sorter = new ShuffleExternalSorter( + taskMemoryManager, + sc.env.blockManager, + taskContext, + 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes + 1, // numPartitions + conf, + new ShuffleWriteMetrics) +val inMemSorter = { + val field = sorter.getClass.getDeclaredField("inMemSorter") + field.setAccessible(true) + field.get(sorter).asInstanceOf[ShuffleInMemorySorter] +} +// Allocate memory to make the next "insertRecord" call triggers a spill. +val bytes = new Array[Byte](1) +while (inMemSorter.hasSpaceForAnotherRecord) { --- End diff -- > Access to the hasSpaceForAnotherRecord is the only reason why we need reflection right? Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209337484 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -94,12 +94,20 @@ public int numRecords() { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; --- End diff -- We also need to set `usableCapacity` to `0`. Otherwise, https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep running and finally touch `array`. Setting `array` to `null` is just for safety so that anything incorrect use will fail with NPE. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21698 > IIUC streaming query always need to specify a checkpoint location? You can use a batch query to read and write Kafka :) My point is if the input and output data sources are not distributed file system, the user doesn't need to specify a file system location to checkpoint. In addition, if the user doesn't specify a checkpoint path, which path should we use? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22062 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22062 [SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access released memory page ## What changes were proposed in this pull request? This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue. ## How was this patch tested? The new unit test will make JVM crash without the fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25081 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22062.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22062 commit 54799cae8ef0727988bbb863d326ea61b4d9ae72 Author: Shixiong Zhu Date: 2018-08-10T00:02:33Z Nested spill in ShuffleExternalSorter should not access released memory page --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21698 > I also like ideas based on checkpointing What if the user does't provide a distributed file system path? E.g., you can read from Kafka and write them back to Kafka and such workloads don't need a distributed file system in standalone mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208750925 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -198,11 +198,14 @@ class SourceProgress protected[sql]( * during a trigger. See [[StreamingQueryProgress]] for more information. * * @param description Description of the source corresponding to this status. + * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily) + * or Sink V1 (until decommissioned). * @since 2.1.0 */ @InterfaceStability.Evolving class SinkProgress protected[sql]( -val description: String) extends Serializable { + val description: String, + val numOutputRows: Long) extends Serializable { --- End diff -- I feel `numOutputRows` is a bit confusing. It sounds like how many rows outputted by the Sink. But the real meaning is how many rows written to the sink. How about `numReceivedRows`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208749439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -58,6 +61,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e val useCommitCoordinator = writer.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) +val totalNumRowsAccumulator = new LongAccumulator() --- End diff -- You should call `SparkContext.longAccumulator` to create an accumulator. Why not use a SQLMetric? If so, it will show in the SQL UI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208751032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -213,6 +216,12 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { -("description" -> JString(description)) +("description" -> JString(description)) ~ + ("numOutputRows" -> JInt(numOutputRows)) } } + +object SinkProgress { + def apply(description: String, numOutputRows: Option[Long]): SinkProgress = + new SinkProgress(description, numOutputRows.getOrElse(-1L)) --- End diff -- nit: please use 2-spaces. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22042 cc @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22042 [SPARK-25005][SS]Support non-consecutive offsets for Kafka ## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark kafka-transaction-read Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22042.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22042 commit dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd Author: Shixiong Zhu Date: 2018-08-08T17:40:37Z Support non-consecutive offsets for Kafka --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r208676022 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum --- End diff -- The assumption in these methods is no longer right, so remove them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21222 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21222 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21222 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21995: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21995 LGTM Merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r207662087 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriterCommitProgress.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +import java.io.Serializable; + +/** + * Sink progress information collected from {@link WriterCommitMessage}. + */ +@InterfaceStability.Evolving +public interface StreamWriterCommitProgress extends Serializable { --- End diff -- Why this is a public API? This is just created and consumed inside Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21854: [SPARK-24896][SQL] Uuid should produce different values ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/21854 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org