[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96975 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96975/testReport)** for PR 22637 at commit [`58de6a3`](https://github.com/apache/spark/commit/58de6a30c19cd5fe6672938bf969f48aaabd0b31). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22615: [SPARK-25016][BUILD][CORE] Remove support for Hadoop 2.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22615 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96961/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22615: [SPARK-25016][BUILD][CORE] Remove support for Hadoop 2.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22615 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22615: [SPARK-25016][BUILD][CORE] Remove support for Hadoop 2.6
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22615 **[Test build #96961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96961/testReport)** for PR 22615 at commit [`1f16631`](https://github.com/apache/spark/commit/1f16631e6718a3becf1fbcc68163ac14e696e9cb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r222905059 --- Diff: python/pyspark/accumulators.py --- @@ -109,10 +109,14 @@ def _deserialize_accumulator(aid, zero_value, accum_param): from pyspark.accumulators import _accumulatorRegistry -accum = Accumulator(aid, zero_value, accum_param) -accum._deserialized = True -_accumulatorRegistry[aid] = accum -return accum +# If this certain accumulator was deserialized, don't overwrite it. +if aid in _accumulatorRegistry: --- End diff -- Yeah, but `_deserialize_accumulator` is only called when doing deserialzation at executors. The constructor saves accumulators in `_accumulatorRegistry` at driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96974/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96974 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96974/testReport)** for PR 22637 at commit [`004ee44`](https://github.com/apache/spark/commit/004ee44bf3d10a96477a4a7640d087f0a9d77954). * This patch **fails Java style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22339: [SPARK-17159][STREAM] Significant speed up for running s...
Github user ScrapCodes commented on the issue: https://github.com/apache/spark/pull/22339 Thank you @srowen and @steveloughran. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20761 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96973/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20761 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20761 **[Test build #96973 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96973/testReport)** for PR 20761 at commit [`b0e68d2`](https://github.com/apache/spark/commit/b0e68d221fe84af54944415df35513c77e73dcc1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96974 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96974/testReport)** for PR 22637 at commit [`004ee44`](https://github.com/apache/spark/commit/004ee44bf3d10a96477a4a7640d087f0a9d77954). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21816: [SPARK-24794][CORE] Driver launched through rest should ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21816 **[Test build #4357 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4357/testReport)** for PR 21816 at commit [`5006de8`](https://github.com/apache/spark/commit/5006de814be8fff3d33b95188ec10b332a584ab0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user AbdealiJK commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r222899988 --- Diff: python/pyspark/accumulators.py --- @@ -109,10 +109,14 @@ def _deserialize_accumulator(aid, zero_value, accum_param): from pyspark.accumulators import _accumulatorRegistry -accum = Accumulator(aid, zero_value, accum_param) -accum._deserialized = True -_accumulatorRegistry[aid] = accum -return accum +# If this certain accumulator was deserialized, don't overwrite it. +if aid in _accumulatorRegistry: --- End diff -- That doesnt seem right because the constructor for `Accumulator` has: ``` ... self._deserialized = False _accumulatorRegistry[aid] = self ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96972/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96972 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96972/testReport)** for PR 22637 at commit [`14d9d1f`](https://github.com/apache/spark/commit/14d9d1ff3d6d48a2ab89c0ed611b5896caa02e11). * This patch **fails Java style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222897804 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala --- @@ -134,6 +166,42 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter size should be (0) } + test("custom resource type requested from yarn") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu")) + +// request a single container and receive it +val handler = createAllocatorWithAdditionalConfigs(1, Map( + YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G", + YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory" -> "1G" +)) +handler.updateResourceRequests() +handler.getNumExecutorsRunning should be (0) +handler.getPendingAllocate.size should be (1) + +val resource = Resource.newInstance(3072, 6) +val resourceTypes = Map("gpu" -> "2G") +ResourceRequestHelper.setResourceRequests(resourceTypes, resource) + +val container = createContainerWithResource("host1", resource) +handler.handleAllocatedContainers(Array(container)) + +// verify custom resource type is part of rmClient.ask set +val askField = rmClient.getClass.getDeclaredField("ask") --- End diff -- Will look into this tomorrow morning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20761 **[Test build #96973 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96973/testReport)** for PR 20761 at commit [`b0e68d2`](https://github.com/apache/spark/commit/b0e68d221fe84af54944415df35513c77e73dcc1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96972 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96972/testReport)** for PR 22637 at commit [`14d9d1f`](https://github.com/apache/spark/commit/14d9d1ff3d6d48a2ab89c0ed611b5896caa02e11). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on the issue: https://github.com/apache/spark/pull/22637 Valid points. Personally I'm a fan of explicit final, instead of implicit. But that's a matter of taste :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222894891 --- Diff: python/pyspark/sql/functions.py --- @@ -2664,6 +2664,28 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@ignore_unicode_prefix +@since(3.0) +def from_csv(col, schema, options={}): +""" +Parses a column containing a CSV string into a :class:`StructType` --- End diff -- `StructType` -> `Row`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222895698 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) + +try { + client.createApplicationSubmissionContext( +new YarnClientApplication(getNewApplicationResponse, appContext), +containerLaunchContext) +} catch { + case NonFatal(e) => +val expectedExceptionClass = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" +if (e.getClass.getName != expectedExceptionClass) { + fail(s"Exception caught: $e is not an instance of $expectedExceptionClass!") +} +} + } + + test("Resource type args propagate (client mode)") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", "fpga")) + +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") + .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "122m") + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m") + .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m") + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G") + .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory", "2G") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) +client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + +appContext.getAMContainerSpec should be (containerLaunchContext) +appContext.getApplicationType should be ("SPARK") + TestYarnResourceRequestHelper.getResourceTypeValue(appContext.getResource, + "some_resource_with_units_1") should be (121) +TestYarnResourceRequestHelper +.getResourceTypeValue(appContext.getResource, "fpga") should be (222) + } + + test("configuration and resource type args propagate (cluster mode)") { --- End diff -- Indeed, extracted a common method, passing the expected values as a sequence of tuples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222895375 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- the input string is expected to be a single CSV record right? Shall we fail if it's not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222895573 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true --- End diff -- This expression only returns null if input is null. Shall we define the nullable as `child.nullable`? And this expression should extend `NullIntolerant` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96970/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96970 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96970/testReport)** for PR 22637 at commit [`0155b7a`](https://github.com/apache/spark/commit/0155b7ad4905d75970e600338ed677b23650a901). * This patch **fails Java style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22635 Thanks for cc'ing me. Will take a look this week. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96969/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96969 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96969/testReport)** for PR 22637 at commit [`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3). * This patch **fails Java style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222894767 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) + +try { + client.createApplicationSubmissionContext( +new YarnClientApplication(getNewApplicationResponse, appContext), +containerLaunchContext) +} catch { + case NonFatal(e) => +val expectedExceptionClass = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" +if (e.getClass.getName != expectedExceptionClass) { + fail(s"Exception caught: $e is not an instance of $expectedExceptionClass!") +} +} + } + + test("Resource type args propagate (client mode)") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", "fpga")) + +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") + .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "122m") + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m") + .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m") + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G") --- End diff -- exactly, see my comment for the other testcase. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22633 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3699/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22633 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222894728 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) + +try { + client.createApplicationSubmissionContext( +new YarnClientApplication(getNewApplicationResponse, appContext), +containerLaunchContext) +} catch { + case NonFatal(e) => +val expectedExceptionClass = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" +if (e.getClass.getName != expectedExceptionClass) { + fail(s"Exception caught: $e is not an instance of $expectedExceptionClass!") +} +} + } + + test("Resource type args propagate (client mode)") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", "fpga")) + +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") --- End diff -- Actually, these two tests were indeed failing. Forgot to run them lately and jenkins does not run them because the Hadoop version is not > 3.1 This also pointed to another issue: in `ResourceRequestHelper.setResourceRequests`, the `setResourceInformationMethod` was invoked with value `"memory` which is not accepted by the YARN API so I had to translate it to `"memory-mb"` in order to work correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22633 Looks fine to me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22633 Looks like `lint-java` doesn't catch any style issues in my PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22633 **[Test build #96971 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96971/testReport)** for PR 22633 at commit [`999b6be`](https://github.com/apache/spark/commit/999b6beee920a8f755028a90e96955527283c205). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894056 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): --- End diff -- `foreachBatchFunction` -> `foreach_batch_function` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894110 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): --- End diff -- `epochId` -> `epoch_id` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893790 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `ForeachWriter` ([do
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894028 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF --- End diff -- 4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893841 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `ForeachWriter` ([do
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893720 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `ForeachWriter` ([do
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96970 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96970/testReport)** for PR 22637 at commit [`0155b7a`](https://github.com/apache/spark/commit/0155b7ad4905d75970e600338ed677b23650a901). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893572 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `ForeachWriter` ([do
[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22637#discussion_r222893178 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,38 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); +try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, +new BlockFetchingListener() { + @Override + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { +synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { +data.retain(); +res.successBlocks.add(blockId); +res.buffers.add(data); +requestsRemaining.release(); + } } } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); + + @Override + public void onBlockFetchFailure(String blockId, Throwable exception) { +synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { +res.failedBlocks.add(blockId); +requestsRemaining.release(); + } } } -} - }, null); +}, null + ); --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222893318 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); +spark = null; + } + + @Test + public void testForeachBatchAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception { + } +}) +.start(); +query.stop(); + } + + @Test + public void testForeachAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreach(new ForeachWriter() { + @Override + public boolean open(long partitionId, long epochId) { +return true; + } + + @Override + public void process(String value) { + } --- End diff -- tiny nit: I would just `public void process(String value) { }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96969 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96969/testReport)** for PR 22637 at commit [`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/22619 @HyukjinKwon Sure :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222893114 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); +spark = null; + } + + @Test + public void testForeachBatchAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception { + } +}) +.start(); +query.stop(); + } + + @Test + public void testForeachAPI() { +StreamingQuery query = spark +.readStream() --- End diff -- Nit: 2 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22619 Yup. Let me leave this open few more days in case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r222892890 --- Diff: python/pyspark/accumulators.py --- @@ -109,10 +109,14 @@ def _deserialize_accumulator(aid, zero_value, accum_param): from pyspark.accumulators import _accumulatorRegistry -accum = Accumulator(aid, zero_value, accum_param) -accum._deserialized = True -_accumulatorRegistry[aid] = accum -return accum +# If this certain accumulator was deserialized, don't overwrite it. +if aid in _accumulatorRegistry: --- End diff -- We only save deserialized accumulators (`_deserialized` is `True`) into this dict. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22047 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22047 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3698/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22047 **[Test build #96968 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96968/testReport)** for PR 22047 at commit [`b378fff`](https://github.com/apache/spark/commit/b378fffd160a24d337fb3acbd63ce0d4db784afe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on the issue: https://github.com/apache/spark/pull/22637 Jenkins test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22637 **[Test build #96967 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96967/testReport)** for PR 22637 at commit [`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22637#discussion_r222892548 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,38 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); +try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, +new BlockFetchingListener() { + @Override + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { +synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { +data.retain(); +res.successBlocks.add(blockId); +res.buffers.add(data); +requestsRemaining.release(); + } } } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); + + @Override + public void onBlockFetchFailure(String blockId, Throwable exception) { +synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { +res.failedBlocks.add(blockId); +requestsRemaining.release(); + } } } -} - }, null); +}, null + ); --- End diff -- Nit: move this onto the previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22637#discussion_r222892759 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java --- @@ -154,6 +154,7 @@ public synchronized void start() { throw new ServiceException("Unable to connect to MetaStore!", e); } finally { + // IMetaStoreClient is not AutoCloseable, closing it manually --- End diff -- I wouldn't bother with this. This is a copy of some Hive code anyway --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/22047 @gatorsmile First of all, thank you very much . Actually the added aggregates weren't null filtering. I have fixed the issue and have added additional test cases. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22637 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/22619 @HyukjinKwon Does this look okay now ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22594: [MINOR][SQL] When batch reading, the number of bytes can...
Github user 10110346 commented on the issue: https://github.com/apache/spark/pull/22594 @srowen Yes,I will update,thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22637 mind filling PR description please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22637 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: Spark 25408
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22637 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on the issue: https://github.com/apache/spark/pull/22399 @HyukjinKwon I've opened a new PR under https://github.com/apache/spark/pull/22637. Would be nice if you can trigger Travis ð --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22637: Spark 25408
GitHub user Fokko opened a pull request: https://github.com/apache/spark/pull/22637 Spark 25408 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Fokko/spark SPARK-25408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22637.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22637 commit d8d7c890f3d6914e47fc613f19415121809f4115 Author: Fokko Driesprong Date: 2018-09-11T20:37:33Z [SPARK-25408] Move to mode ideomatic Java8 Use features from Java8 such as: - Try-with-resource blocks commit 99fd3a5af2c4fddc752b5b9c8725c4b68b2c9dbc Author: Fokko Driesprong Date: 2018-10-05T04:58:37Z [SPARK-25408] Move to mode ideomatic Java8 Fix the un AutoCloseable operations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r222891761 --- Diff: python/pyspark/sql/tests.py --- @@ -351,7 +351,7 @@ def tearDown(self): super(SQLTests, self).tearDown() # tear down test_bucketed_write state -self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket") +self.spark.sql("DROP DATABASE IF EXISTS some_db CASCADE") --- End diff -- test_current_database,test_list_databases,test_list_tables,test_list_functions and test_list_columns all these test case create the database and does not drop it,so the folder will be present which result in the test case failures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r222891525 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -407,6 +407,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("create a managed table with the existing non-empty directory") { withTable("tab1") { + Utils.createDirectory(spark.sessionState.conf.warehousePath) --- End diff -- As per line 414 it is trying to create empty directory under warehouse path,this statement will fail as parent folder(ware house) does not exists --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22500: [SPARK-25488][TEST] Refactor MiscBenchmark to use main m...
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/22500 @dongjoon-hyun Is this ready to go? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222890990 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.{Integer => JInteger, Long => JLong} +import java.lang.reflect.InvocationTargetException + +import scala.util.Try + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the YARN API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +private object ResourceRequestHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + + def setResourceRequests( --- End diff -- added doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222890961 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); --- End diff -- Since `deleteRecursively` can raise `IOException`, can we have `try ... finally ...` to ensure `spark.stop` invocation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222890547 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +object TestYarnResourceRequestHelper extends Logging { + def initializeResourceTypes(resourceTypes: List[String]): Unit = { +if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) { + throw new IllegalStateException("initializeResourceTypes() should not be invoked " + +"since YARN resource types is not available because of old Hadoop version!" ) +} + +val allResourceTypes = new ListBuffer[AnyRef] +val defaultResourceTypes = List( + createResourceTypeInfo("memory-mb"), + createResourceTypeInfo("vcores")) +val customResourceTypes = resourceTypes.map(rt => createResourceTypeInfo(rt)) +allResourceTypes ++= defaultResourceTypes +allResourceTypes ++= customResourceTypes + +reinitializeResources(allResourceTypes) + } + + private def createResourceTypeInfo(resourceName: String): AnyRef = { +val resTypeInfoClass = Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo") +val resTypeInfoNewInstanceMethod = resTypeInfoClass.getMethod("newInstance", classOf[String]) +resTypeInfoNewInstanceMethod.invoke(null, resourceName) + } + + private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): Unit = { +val resourceUtilsClass = + Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils") +val reinitializeResourcesMethod = resourceUtilsClass.getMethod("reinitializeResources", + classOf[java.util.List[AnyRef]]) +reinitializeResourcesMethod.invoke(null, resourceTypes.asJava) + } + + def getResourceTypeValue(res: Resource, name: String): AnyRef = { +val resourceInformation: AnyRef = getResourceInformation(res, name) +invokeMethod(resourceInformation, "getValue") + } + + def getResourceInformationByName(res: Resource, nameParam: String): ResourceInformation = { +val resourceInformation: AnyRef = getResourceInformation(res, nameParam) +val name = invokeMethod(resourceInformation, "getName").asInstanceOf[String] +val value = invokeMethod(resourceInformation, "getValue").asInstanceOf[Long] +val units = invokeMethod(resourceInformation, "getUnits").asInstanceOf[String] +new ResourceInformation(name, value, units) + } + + private def getResourceInformation(res: Resource, name: String) = { +if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) { + throw new IllegalStateException("assertResourceTypeValue() should not be invoked " + +"since yarn resource types is not available because of old Hadoop version!") +} + +val getResourceInformationMethod = res.getClass.getMethod("getResourceInformation", + classOf[String]) +val resourceInformation = getResourceInformationMethod.invoke(res, name) +resourceInformation + } + + private def invokeMethod(resourceInformation: AnyRef, methodName: String): AnyRef = { +val getValueMethod = resourceInformation.getClass.getMethod(methodName) +getValueMethod.invoke(resourceInformation) + } + + class ResourceInformation(val name: String, val value: Long, val units: String) { --- End diff -- Thanks for pointing this, this is indeed more readable and testable. --- - To unsubscribe, e-mail: reviews-
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user AbdealiJK commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r222890103 --- Diff: python/pyspark/accumulators.py --- @@ -109,10 +109,14 @@ def _deserialize_accumulator(aid, zero_value, accum_param): from pyspark.accumulators import _accumulatorRegistry -accum = Accumulator(aid, zero_value, accum_param) -accum._deserialized = True -_accumulatorRegistry[aid] = accum -return accum +# If this certain accumulator was deserialized, don't overwrite it. +if aid in _accumulatorRegistry: --- End diff -- Should it be `if aid in _accumulatorRegistry and _accumulatorRegistry[aid]._deserialized is True` or: ``` if aid in _accumulatorRegistry: _accumulatorRegistry[aid]._deserialize = True return _accumulatorRegistry[aid] ``` To make double sure that this function always returns a deserialize version of the accum ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22399 Let me trigger it in the next PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222889602 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestValidatorSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterAll, Matchers} + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} + +class ResourceRequestValidatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +super.beforeAll() + } + + test("empty SparkConf should be valid") { +val sparkConf = new SparkConf() +ResourceRequestValidator.validateResources(sparkConf) + } + + test("just normal resources are defined") { +val sparkConf = new SparkConf() +sparkConf.set("spark.driver.memory", "3G") +sparkConf.set("spark.driver.cores", "4") +sparkConf.set("spark.executor.memory", "4G") +sparkConf.set("spark.executor.cores", "2") + +ResourceRequestValidator.validateResources(sparkConf) + } + + test("Memory defined with new config for executor") { +val sparkConf = new SparkConf() +sparkConf.set("spark.yarn.executor.resource.memory", "30G") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.executor.resource.memory") + } + + test("Cores defined with new config for executor") { +val sparkConf = new SparkConf() +sparkConf.set("spark.yarn.executor.resource.cores", "5") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.executor.resource.cores") + } + + test("Memory defined with new config, client mode") { +val sparkConf = new SparkConf() +sparkConf.set("spark.yarn.am.resource.memory", "1G") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.am.resource.memory") + } + + test("Memory defined with new config for driver, cluster mode") { +val sparkConf = new SparkConf() +sparkConf.set("spark.yarn.driver.resource.memory", "1G") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.driver.resource.memory") + } + + test("Cores defined with new config, client mode") { +val sparkConf = new SparkConf() +sparkConf.set("spark.driver.memory", "2G") +sparkConf.set("spark.driver.cores", "4") +sparkConf.set("spark.executor.memory", "4G") +sparkConf.set("spark.yarn.am.cores", "2") + +sparkConf.set("spark.yarn.am.resource.cores", "3") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.am.resource.cores") + } + + test("Cores defined with new config for driver, cluster mode") { +val sparkConf = new SparkConf() +sparkConf.set("spark.yarn.driver.resource.cores", "1G") + +val thrown = intercept[SparkException] { + ResourceRequestValidator.validateResources(sparkConf) +} +thrown.getMessage should include ("spark.yarn.driver.resource.cores") + } + + test("various duplicated definitions") { +val sparkConf = new SparkConf() +sparkConf.set("spark.driver.memory", "2G") --- End diff -- I searched for references in existing test code and found out that at least `"spark.driver.memory"` and `spark.executor.memory` are used in
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on the issue: https://github.com/apache/spark/pull/22399 The tests passed earlier, how would it be possible that it would fail on master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222889144 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +object TestYarnResourceRequestHelper extends Logging { + def initializeResourceTypes(resourceTypes: List[String]): Unit = { +if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) { + throw new IllegalStateException("initializeResourceTypes() should not be invoked " + +"since YARN resource types is not available because of old Hadoop version!" ) +} + +val allResourceTypes = new ListBuffer[AnyRef] +val defaultResourceTypes = List( + createResourceTypeInfo("memory-mb"), + createResourceTypeInfo("vcores")) +val customResourceTypes = resourceTypes.map(rt => createResourceTypeInfo(rt)) +allResourceTypes ++= defaultResourceTypes +allResourceTypes ++= customResourceTypes + +reinitializeResources(allResourceTypes) + } + + private def createResourceTypeInfo(resourceName: String): AnyRef = { +val resTypeInfoClass = Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo") +val resTypeInfoNewInstanceMethod = resTypeInfoClass.getMethod("newInstance", classOf[String]) +resTypeInfoNewInstanceMethod.invoke(null, resourceName) + } + + private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): Unit = { +val resourceUtilsClass = + Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils") +val reinitializeResourcesMethod = resourceUtilsClass.getMethod("reinitializeResources", + classOf[java.util.List[AnyRef]]) +reinitializeResourcesMethod.invoke(null, resourceTypes.asJava) + } + + def getResourceTypeValue(res: Resource, name: String): AnyRef = { +val resourceInformation: AnyRef = getResourceInformation(res, name) +invokeMethod(resourceInformation, "getValue") + } + + def getResourceInformationByName(res: Resource, nameParam: String): ResourceInformation = { +val resourceInformation: AnyRef = getResourceInformation(res, nameParam) +val name = invokeMethod(resourceInformation, "getName").asInstanceOf[String] +val value = invokeMethod(resourceInformation, "getValue").asInstanceOf[Long] +val units = invokeMethod(resourceInformation, "getUnits").asInstanceOf[String] +new ResourceInformation(name, value, units) + } + + private def getResourceInformation(res: Resource, name: String) = { --- End diff -- In what way I could refer to the `ResourceInformation` class? Remember, that class either present or not present on the classpath depending on what version of Hadoop Spark depends on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22399 @Fokko, Let's follow the PR format next time BTW, for instance, "How was this patch tested?" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on the issue: https://github.com/apache/spark/pull/22399 It wasn't a merge conflict. I failed to notice this had not actually been tested. To check locally you'd have to make sure you enable more profiles like Hive to build some of the code that changed here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222888986 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +object TestYarnResourceRequestHelper extends Logging { --- End diff -- Fixed the name and removed Logging. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222888557 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more --- End diff -- @zsxwing . Could you fix the indentation of Apache License? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222888518 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { +assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", "121m") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) + +try { + client.createApplicationSubmissionContext( +new YarnClientApplication(getNewApplicationResponse, appContext), +containerLaunchContext) +} catch { + case NonFatal(e) => +val expectedExceptionClass = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" +if (e.getClass.getName != expectedExceptionClass) { --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222888391 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { --- End diff -- fixed the namings in this class --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on the issue: https://github.com/apache/spark/pull/22399 Thanks @srowen for pointing out the errors. Weird that it did not come up as a merge conflict. Let me open a new PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222887537 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -35,13 +36,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ -import org.scalatest.Matchers +import org.scalatest.{BeforeAndAfterAll, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.util.{SparkConfWithEnv, Utils} -class ClientSuite extends SparkFunSuite with Matchers { +class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { --- End diff -- I guess I wanted to use it before then I forgot to remote this later. Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222887404 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ + +private object ResourceRequestValidator { + private val ERROR_PREFIX: String = "Error:" + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val resourceDefinitions = Seq[(String, String)]( + (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"), + (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"), + ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"), --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222887435 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ + +private object ResourceRequestValidator { + private val ERROR_PREFIX: String = "Error:" + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val resourceDefinitions = Seq[(String, String)]( + (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"), + (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"), + ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"), + (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"), + ("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory"), + (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores")) +val errorMessage = new mutable.StringBuilder() + +resourceDefinitions.foreach { case (sparkName, resourceRequest) => + if (sparkConf.contains(resourceRequest)) { +errorMessage.append(s"$ERROR_PREFIX Do not use $resourceRequest, " + +s"please use $sparkName instead!\n") + } +} + +// throw exception after loop --- End diff -- removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22634 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22634 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96959/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222886705 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ + +private object ResourceRequestValidator { + private val ERROR_PREFIX: String = "Error:" + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222886619 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ + +private object ResourceRequestValidator { + private val ERROR_PREFIX: String = "Error:" + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22634 **[Test build #96959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96959/testReport)** for PR 22634 at commit [`0cdd4e5`](https://github.com/apache/spark/commit/0cdd4e577da55e93a029dbe4a34b5999802360d5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user szyszy commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r222886571 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.{Integer => JInteger, Long => JLong} +import java.lang.reflect.InvocationTargetException + +import scala.util.Try + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the YARN API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +private object ResourceRequestHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + + def setResourceRequests( + resources: Map[String, String], + resource: Resource): Unit = { +require(resource != null, "Resource parameter should not be null!") + +logDebug(s"Custom resource types requested: $resources") +if (!isYarnResourceTypesAvailable()) { + if (resources.nonEmpty) { +logWarning("Ignoring updating resource with resource types because " + +"the version of YARN does not support it!") + } + return +} + +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) +resources.foreach { case (name, rawAmount) => + try { +val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount +val amount = amountPart.toLong +val unit = unitPart match { + case "g" => "G" + case "t" => "T" + case "p" => "P" + case _ => unitPart +} +logDebug(s"Registering resource with name: $name, amount: $amount, unit: $unit") +val resourceInformation = + createResourceInformation(name, amount, unit, resInfoClass) +setResourceInformationMethod.invoke( + resource, name, resourceInformation.asInstanceOf[AnyRef]) + } catch { +case _: MatchError => throw new IllegalArgumentException( --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org