[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20549 **[Test build #4089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4089/testReport)** for PR 20549 at commit [`d7144f6`](https://github.com/apache/spark/commit/d7144f63a99e575d5c996fd7919bdbe44266620f). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20549 **[Test build #4089 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4089/testReport)** for PR 20549 at commit [`d7144f6`](https://github.com/apache/spark/commit/d7144f63a99e575d5c996fd7919bdbe44266620f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/20549 I can run the tests for you, but, not sure this would be merged even if it passes. (You won't have permissions to let the tests run or whitelist yourself) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20490 **[Test build #87244 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87244/testReport)** for PR 20490 at commit [`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20490 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/737/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20490 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20490 just type "retest this please" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20490 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20541: [SPARK-23356][SQL]Pushes Project to both sides of Union ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20541 I'm confused about why we need `PushProjectionThroughUnion`. Generally we only need to push down required columns, not entire project list, as there is no benifit of doing this. I think we just need to handle `Union` in the `ColumnPruning` rule, but I may miss something. cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20555 **[Test build #87243 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87243/testReport)** for PR 20555 at commit [`b26ffce`](https://github.com/apache/spark/commit/b26ffce6780078dbc38bff658e1ef7e9c56c3dd8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20555 cc @kiszk @sitalkedia @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20555 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20555 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/736/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20555 [SPARK-23366] Improve hot reading path in ReadAheadInputStream ## What changes were proposed in this pull request? `ReadAheadInputStream` was introduced in https://github.com/apache/spark/pull/18317/ to optimize reading spill files from disk. However, from the profiles it seems that the hot path of reading small amounts of data (like readInt) is inefficient - it involves taking locks, and multiple checks. Optimize locking: Lock is not needed when simply accessing the active buffer. Only lock when needing to swap buffers or trigger async reading, or get information about the async state. Optimize short-path single byte reads, that are used e.g. by Java library DataInputStream.readInt. The asyncReader used to call "read" only once on the underlying stream, that never filled the underlying buffer when it was wrapping an LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger the async reader to be triggered to fill the read ahead buffer on each call, because the reader would see that the active buffer is below the refill threshold all the time. However, filling the full buffer all the time could introduce increased latency, so also add an `AtomicBoolean` flag for the async reader to return earlier if there is a reader waiting for data. Remove `readAheadThresholdInBytes` and instead immediately trigger async read when switching the buffers. It allows to simplify code paths, especially the hot one that then only has to check if there is available data in the active buffer, without worrying if it needs to retrigger async read. It seems to have positive effect on perf. ## How was this patch tested? It was noticed as a regression in some workloads after upgrading to Spark 2.3. It was particularly visible on TPCDS Q95 running on instances with fast disk (i3 AWS instances). Running with profiling: * Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read * Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read * Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - very slightly slower, practically within noise. We didn't see other regressions, and many workloads in general seem to be faster with Spark 2.3 (not investigated if thanks to async readed, or unrelated). You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20555.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20555 commit 987f15ccb01b6c0351fbfdd49d6930b929c50a74 Author: Juliusz SompolskiDate: 2018-01-30T20:54:47Z locking tweak commit b26ffce6780078dbc38bff658e1ef7e9c56c3dd8 Author: Juliusz Sompolski Date: 2018-02-01T14:27:09Z fill the read ahead buffer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/20516#discussion_r167134346 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -59,6 +59,7 @@ abstract class SparkFunSuite protected val enableAutoThreadAudit = true protected override def beforeAll(): Unit = { +System.setProperty("spark.testing", "true") --- End diff -- If this parameter is not set, sys.props.contains ("spark.testing") is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20477 **[Test build #87242 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87242/testReport)** for PR 20477 at commit [`0cc0600`](https://github.com/apache/spark/commit/0cc0600b8f6f3a46189ae38850835f34b57bd945). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20537: [SPARK-23314][PYTHON] Add ambiguous=False when lo...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20537#discussion_r167133597 --- Diff: python/pyspark/sql/types.py --- @@ -1744,8 +1744,27 @@ def _check_series_convert_timestamps_internal(s, timezone): from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): +# tz_localize with ambiguous=False has the same behavior of pytz.localize +# >>> import datetime +# >>> import pandas as pd +# >>> import pytz +# >>> +# >>> t = datetime.datetime(2015, 11, 1, 1, 23, 24) +# >>> ts = pd.Series([t]) +# >>> tz = pytz.timezone('America/New_York') +# >>> +# >>> ts.dt.tz_localize(tz, ambiguous=False) +# 0 2015-11-01 01:23:24-05:00 +# dtype: datetime64[ns, America/New_York] +# >>> +# >>> ts.dt.tz_localize(tz, ambiguous=True) +# 0 2015-11-01 01:23:24-04:00 +# dtype: datetime64[ns, America/New_York] +# >>> +# >>> str(tz.localize(t)) +# '2015-11-01 01:23:24-05:00' --- End diff -- @icexelloss, I got that it's good to know but shall we describe it as a prose? This comment looks a format of a doctest but they are actually just in comments. It would be nicer if we just have a explanation in the comments, not as a doctest format. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20477 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/735/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20477: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20477 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20516#discussion_r167133408 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -,7 +,7 @@ private[spark] object Utils extends Logging { */ def portMaxRetries(conf: SparkConf): Int = { val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt) -if (conf.contains("spark.testing")) { +if (isTesting || conf.contains("spark.testing")) { --- End diff -- shall we just call `isTesting` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20516: [SPARK-23343][CORE][TEST] Increase the exception ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20516#discussion_r167133433 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -59,6 +59,7 @@ abstract class SparkFunSuite protected val enableAutoThreadAudit = true protected override def beforeAll(): Unit = { +System.setProperty("spark.testing", "true") --- End diff -- why we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20516: [SPARK-23343][CORE][TEST] Increase the exception test fo...
Github user heary-cao commented on the issue: https://github.com/apache/spark/pull/20516 @cloud-fan thank you for suggest. `./project/SparkBuild.scala:795: javaOptions in Test += "-Dspark.testing=1"` seems only the compiler of the spark effectively, No effect on the SparkFunSuite unit test. I update this PR to provides a solution to fix it. Can you help me to review it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/20552 It's my intent to say that other data sources built by general developers aren't supposed to use batch ids in the executors for any purpose. In addition to the issue you mentioned, I don't think there's a compelling reason to do so in the DataSourceV2 model, and I worry it's easy to write implementations that seem correct but aren't that way. Since this interface is still evolving, I think it makes sense to revisit the question if we notice a scenario where it's infeasible to rewrite a piece of transactional logic to not use the batch ID in the executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20552 **[Test build #87241 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport)** for PR 20552 at commit [`a33a35c`](https://github.com/apache/spark/commit/a33a35ccbae7350519a3faf8d5d3d6f35692feb3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167126862 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.stop() } } + --- End diff -- Good instinct, it didn't quite work. Added the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167126838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) +extends DataWriter[InternalRow] { + private val initialEpochId: Long = { +// Start with the microbatch ID. If it's not there, we're in continuous execution, +// so get the start epoch. +// This ID will be incremented as commits happen. +TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match { + case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + case batch => batch.toLong +} +
[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...
Github user tovbinm commented on the issue: https://github.com/apache/spark/pull/17982 2.12 would be even better. I haven't seen a ticket related to 2.12 upgrade. I can give it a try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20549: SPARK-18844[MLLIB] Add more binary classification metric...
Github user sandecho commented on the issue: https://github.com/apache/spark/pull/20549 Srowen: Will the result of the test not be posted? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20554 @jose-torres @zsxwing please take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20545 **[Test build #87240 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87240/testReport)** for PR 20545 at commit [`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/734/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20545 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87235/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/17982 Unless it changed the internals of the interpreter to afford a different place to hack in the init, don't think it is any different. However the build and most tests already work with 2.12.d --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20545 **[Test build #87235 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87235/testReport)** for PR 20545 at commit [`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167124768 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { ) } - testWithUninterruptibleThread( -"deserialization of initial offset with Spark 2.1.0") { -withTempDir { metadataPath => - val topic = newTopic - testUtils.createTopic(topic, partitions = 3) - - val provider = new KafkaSourceProvider - val parameters = Map( -"kafka.bootstrap.servers" -> testUtils.brokerAddress, -"subscribe" -> topic - ) - val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, -"", parameters) - source.getOffset.get // Write initial offset - - // Make sure Spark 2.1.0 will throw an exception when reading the new log - intercept[java.lang.IllegalArgumentException] { -// Simulate how Spark 2.1.0 reads the log -Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in => - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) -} - } -} - } - - testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") { + test("deserialization of initial offset written by Spark 2.1.0") { withTempDir { metadataPath => --- End diff -- Changed the two tests below to not use the source/reader directly (too low-level implementation dependent test) to actually run a streaming query using sample initial offset files in the `test/resources`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167124564 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { ) } - testWithUninterruptibleThread( --- End diff -- I think this test is superfluous and does not test anything useful. As with the other modified tests, "simulating" an implementation is a BAD test, and in this particular case it is attempting to simulate the 2.1.0 log, which is not necessary any more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167124346 --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin --- @@ -0,0 +1,2 @@ +0v9 +{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}} --- End diff -- note: should remove the newline to keep it consistent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167124308 --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin --- @@ -1 +1 @@ -2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}} \ No newline at end of file +2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}} --- End diff -- note: should remove the newline to keep it consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17982: [SPARK-20395][BUILD] Update Scala to 2.11.11 and zinc to...
Github user tovbinm commented on the issue: https://github.com/apache/spark/pull/17982 @som-snytt @srowen what about `2.11.12`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123917 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123837 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123713 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123614 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123580 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123513 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( +kafkaOffsetReader: KafkaOffsetReader, +executorKafkaParams: ju.Map[String, Object], +options: DataSourceOptions, +metadataPath: String, +startingOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + type PartitionOffsetMap = Map[TopicPartition, Long] + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( +"kafkaConsumer.pollTimeoutMs", +SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s")) + + private val maxOffsetsPerTrigger = +Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { +// Make sure initialPartitionOffsets is initialized +initialPartitionOffsets + +startPartitionOffsets = Option(start.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +.getOrElse(initialPartitionOffsets) + +endPartitionOffsets = Option(end.orElse(null)) +.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) +
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20554 **[Test build #87239 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)** for PR 20554 at commit [`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167123199 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -408,8 +401,27 @@ private[kafka010] object KafkaSourceProvider extends Logging { private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" + val TOPIC_OPTION_KEY = "topic" + val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = --- End diff -- Moved this from KafkaSource to this class because this is used by multiple reader classes and therefore should be present in the higher level class (e.g. the provider class). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20554 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/733/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20554 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20554 **[Test build #87238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)** for PR 20554 at commit [`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20554 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20554 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/732/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20554 [SPARK-23362][SS] Migrate Kafka Microbatch source to v2 ## What changes were proposed in this pull request? Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2). ## How was this patch tested? Existing tests, few modified to be better tests than the existing ones. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23362 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20554.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20554 commit 3ed2a509276194214875f39e1e18d8093155c54c Author: Tathagata DasDate: 2018-02-09T01:46:56Z Migrated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19775 My original intention is to expose MetricsSystem related interface in #11994 , so that users can leverage such interface to build their own metrics sink/source out of Spark. Unfortunately I'm stuck on the #11994 , but still I think it is better to leave this as a package out of Spark, pulling to much dependencies for non-core functionalities seems not so reasonable (just my thoughts). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20541: [SPARK-23356][SQL]Pushes Project to both sides of Union ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20541 **[Test build #87237 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87237/testReport)** for PR 20541 at commit [`4f5d46b`](https://github.com/apache/spark/commit/4f5d46baca612caaa882cbabb3b35665e9c7ed8b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167120763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- actually.. probably should not inline this. its outer closure may not be serializable in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167120724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( --- End diff -- actually.. probably should not inline this. its outer closure may not be serializable in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20303 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/731/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20303 **[Test build #87236 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87236/testReport)** for PR 20303 at commit [`603c6d5`](https://github.com/apache/spark/commit/603c6d58ae9a72f8202236682c78cd48a9bb320e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20303: [SPARK-23128][SQL] A new approach to do adaptive executi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20303 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20545 Yup, looks both tests are flaky :-/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87232/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20546: [SPARK-20659][Core] Removing sc.getExecutorStorageStatus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20546 **[Test build #87232 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87232/testReport)** for PR 20546 at commit [`8544380`](https://github.com/apache/spark/commit/8544380e91f5bfa7c95cf613d49bc9144fedee9f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20541: [SPARK-23356][SQL]Pushes Project to both sides of...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/20541#discussion_r167113352 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -400,13 +400,24 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper // Push down deterministic projection through UNION ALL case p @ Project(projectList, Union(children)) => assert(children.nonEmpty) - if (projectList.forall(_.deterministic)) { -val newFirstChild = Project(projectList, children.head) + val (deterministicList, nonDeterministic) = projectList.partition(_.deterministic) + + if (deterministicList.nonEmpty) { +val newFirstChild = Project(deterministicList, children.head) val newOtherChildren = children.tail.map { child => val rewrites = buildRewrites(children.head, child) - Project(projectList.map(pushToRight(_, rewrites)), child) + Project(deterministicList.map(pushToRight(_, rewrites)), child) --- End diff -- if we push a + 1 to union, just a + 1. seems this lack of test cases. let's add this test cases. thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20545 Seems I saw the same test failure at other PRs too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20382 Hi @tdas , would you please help to review again, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20545 **[Test build #87235 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87235/testReport)** for PR 20545 at commit [`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/730/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20545 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20537: [SPARK-23314][PYTHON] Add ambiguous=False when localizin...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20537 Great thanks The fix is actually just two lines. LGTM @hyukjinkwon could you help merge this ASAP to 2.3? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20545 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87233/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20545: [SPARK-23359][SQL] Adds an alias 'names' of 'fieldNames'...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20545 **[Test build #87233 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87233/testReport)** for PR 20545 at commit [`664a62c`](https://github.com/apache/spark/commit/664a62c7da9ba5da2007d40ef9c157f7e82938c5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18982 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18982 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87228/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20552 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87231/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18982 **[Test build #87228 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87228/testReport)** for PR 18982 at commit [`339c793`](https://github.com/apache/spark/commit/339c793451adafed64e57924f670254d669d82a8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20552 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20552 **[Test build #87231 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87231/testReport)** for PR 20552 at commit [`87d0bc8`](https://github.com/apache/spark/commit/87d0bc8ce23ab5a95ba0b5432d6b58042b32bdac). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167081693 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.stop() } } + --- End diff -- I think there should be a test with continuous processing + foreach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167077542 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( --- End diff -- nit: This is really a small class. Maybe inline this rather than define a confusing name`...InternalWriter` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167076621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- nit: params on different lines --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167078037 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- similarly ... maybe inline this class as well. its very small. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167080181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) +extends DataWriter[InternalRow] { + private val initialEpochId: Long = { +// Start with the microbatch ID. If it's not there, we're in continuous execution, +// so get the start epoch. +// This ID will be incremented as commits happen. +TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match { + case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + case batch => batch.toLong +} + }
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167080661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) --- End diff -- params in separate lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167078671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( --- End diff -- add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167095485 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) --- End diff -- >but it's generally a good idea to call setReuseAddress(true). Do I want to do this since I am asking for some available port rather than binding to a known port? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20553 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/724/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20553 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20553 **[Test build #87234 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87234/testReport)** for PR 20553 at commit [`50ebb50`](https://github.com/apache/spark/commit/50ebb5068a35a9a0f2becd27153bdc7cc7aae251). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20553 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87234/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20490 The test failure doesn't look related to these changes to me. How can I get on the list to ask jenkins to retest a PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20553 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/724/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20553 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/729/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20553 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org