[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222897804
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -134,6 +166,42 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 size should be (0)
   }
 
+  test("custom resource type requested from yarn") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu"))
+
+// request a single container and receive it
+val handler = createAllocatorWithAdditionalConfigs(1, Map(
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G",
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory" -> "1G"
+))
+handler.updateResourceRequests()
+handler.getNumExecutorsRunning should be (0)
+handler.getPendingAllocate.size should be (1)
+
+val resource = Resource.newInstance(3072, 6)
+val resourceTypes = Map("gpu" -> "2G")
+ResourceRequestHelper.setResourceRequests(resourceTypes, resource)
+
+val container = createContainerWithResource("host1", resource)
+handler.handleAllocatedContainers(Array(container))
+
+// verify custom resource type is part of rmClient.ask set
+val askField = rmClient.getClass.getDeclaredField("ask")
--- End diff --

Will look into this tomorrow morning


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20761
  
**[Test build #96973 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96973/testReport)**
 for PR 20761 at commit 
[`b0e68d2`](https://github.com/apache/spark/commit/b0e68d221fe84af54944415df35513c77e73dcc1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96972 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96972/testReport)**
 for PR 22637 at commit 
[`14d9d1f`](https://github.com/apache/spark/commit/14d9d1ff3d6d48a2ab89c0ed611b5896caa02e11).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/spark/pull/22637
  
Valid points. Personally I'm a fan of explicit final, instead of implicit. 
But that's a matter of taste :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222894891
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2664,6 +2664,28 @@ def sequence(start, stop, step=None):
 _to_java_column(start), _to_java_column(stop), 
_to_java_column(step)))
 
 
+@ignore_unicode_prefix
+@since(3.0)
+def from_csv(col, schema, options={}):
+"""
+Parses a column containing a CSV string into a :class:`StructType`
--- End diff --

`StructType` -> `Row`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222895698
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"some_resource_with_units_1", "122m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory", "2G")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+client.createApplicationSubmissionContext(
+  new YarnClientApplication(getNewApplicationResponse, appContext),
+  containerLaunchContext)
+
+appContext.getAMContainerSpec should be (containerLaunchContext)
+appContext.getApplicationType should be ("SPARK")
+
TestYarnResourceRequestHelper.getResourceTypeValue(appContext.getResource,
+  "some_resource_with_units_1") should be (121)
+TestYarnResourceRequestHelper
+.getResourceTypeValue(appContext.getResource, "fpga") should be 
(222)
+  }
+
+  test("configuration and resource type args propagate (cluster mode)") {
--- End diff --

Indeed, extracted a common method, passing the expected values as a 
sequence of tuples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222895375
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
--- End diff --

the input string is expected to be a single CSV record right? Shall we fail 
if it's not?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222895573
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
--- End diff --

This expression only returns null if input is null. Shall we define the 
nullable as `child.nullable`? And this expression should extend `NullIntolerant`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96970/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96970 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96970/testReport)**
 for PR 22637 at commit 
[`0155b7a`](https://github.com/apache/spark/commit/0155b7ad4905d75970e600338ed677b23650a901).
 * This patch **fails Java style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public abstract class RowBasedKeyValueBatch extends MemoryConsumer 
implements Closeable `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22635
  
Thanks for cc'ing me. Will take a look this week.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96969/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96969 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96969/testReport)**
 for PR 22637 at commit 
[`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3).
 * This patch **fails Java style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222894767
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"some_resource_with_units_1", "122m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G")
--- End diff --

exactly, see my comment for the other testcase.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22633
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3699/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222894728
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
--- End diff --

Actually, these two tests were indeed failing. Forgot to run them lately 
and jenkins does not run them because the Hadoop version is not > 3.1
This also pointed to another issue: in 
`ResourceRequestHelper.setResourceRequests`, the `setResourceInformationMethod` 
was invoked with value `"memory` which is not accepted by the YARN API so I had 
to translate it to `"memory-mb"` in order to work correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22633
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22633
  
Looks fine to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22633
  
Looks like `lint-java` doesn't catch any style issues in my PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22633
  
**[Test build #96971 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96971/testReport)**
 for PR 22633 at commit 
[`999b6be`](https://github.com/apache/spark/commit/999b6beee920a8f755028a90e96955527283c205).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894056
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894110
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`epochId` -> `epoch_id`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893790
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894028
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
--- End diff --

4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893841
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893720
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96970 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96970/testReport)**
 for PR 22637 at commit 
[`0155b7a`](https://github.com/apache/spark/commit/0155b7ad4905d75970e600338ed677b23650a901).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893572
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread Fokko
Github user Fokko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22637#discussion_r222893178
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 ---
@@ -133,37 +133,38 @@ private FetchResult fetchBlocks(
 
 final Semaphore requestsRemaining = new Semaphore(0);
 
-ExternalShuffleClient client = new ExternalShuffleClient(clientConf, 
null, false, 5000);
-client.init(APP_ID);
-client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
-  new BlockFetchingListener() {
-@Override
-public void onBlockFetchSuccess(String blockId, ManagedBuffer 
data) {
-  synchronized (this) {
-if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-  data.retain();
-  res.successBlocks.add(blockId);
-  res.buffers.add(data);
-  requestsRemaining.release();
+try (ExternalShuffleClient client = new 
ExternalShuffleClient(clientConf, null, false, 5000)) {
+  client.init(APP_ID);
+  client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
+new BlockFetchingListener() {
+  @Override
+  public void onBlockFetchSuccess(String blockId, ManagedBuffer 
data) {
+synchronized (this) {
+  if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+data.retain();
+res.successBlocks.add(blockId);
+res.buffers.add(data);
+requestsRemaining.release();
+  }
 }
   }
-}
-
-@Override
-public void onBlockFetchFailure(String blockId, Throwable 
exception) {
-  synchronized (this) {
-if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-  res.failedBlocks.add(blockId);
-  requestsRemaining.release();
+
+  @Override
+  public void onBlockFetchFailure(String blockId, Throwable 
exception) {
+synchronized (this) {
+  if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+res.failedBlocks.add(blockId);
+requestsRemaining.release();
+  }
 }
   }
-}
-  }, null);
+}, null
+  );
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222893318
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
+spark = null;
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreachBatch(new VoidFunction2, Long>() {
+  @Override
+  public void call(Dataset v1, Long v2) throws Exception {
+  }
+})
+.start();
+query.stop();
+  }
+
+  @Test
+  public void testForeachAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreach(new ForeachWriter() {
+  @Override
+  public boolean open(long partitionId, long epochId) {
+return true;
+  }
+
+  @Override
+  public void process(String value) {
+  }
--- End diff --

tiny nit: I would just `public void process(String value) { }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96969 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96969/testReport)**
 for PR 22637 at commit 
[`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...

2018-10-04 Thread dilipbiswal
Github user dilipbiswal commented on the issue:

https://github.com/apache/spark/pull/22619
  
@HyukjinKwon Sure :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222893114
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
+spark = null;
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreachBatch(new VoidFunction2, Long>() {
+  @Override
+  public void call(Dataset v1, Long v2) throws Exception {
+  }
+})
+.start();
+query.stop();
+  }
+
+  @Test
+  public void testForeachAPI() {
+StreamingQuery query = spark
+.readStream()
--- End diff --

Nit: 2 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22619
  
Yup. Let me leave this open few more days in case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...

2018-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22635#discussion_r222892890
  
--- Diff: python/pyspark/accumulators.py ---
@@ -109,10 +109,14 @@
 
 def _deserialize_accumulator(aid, zero_value, accum_param):
 from pyspark.accumulators import _accumulatorRegistry
-accum = Accumulator(aid, zero_value, accum_param)
-accum._deserialized = True
-_accumulatorRegistry[aid] = accum
-return accum
+# If this certain accumulator was deserialized, don't overwrite it.
+if aid in _accumulatorRegistry:
--- End diff --

We only save deserialized accumulators (`_deserialized` is `True`) into 
this dict.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22047
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22047
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3698/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22047
  
**[Test build #96968 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96968/testReport)**
 for PR 22047 at commit 
[`b378fff`](https://github.com/apache/spark/commit/b378fffd160a24d337fb3acbd63ce0d4db784afe).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/22637
  
Jenkins test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22637
  
**[Test build #96967 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96967/testReport)**
 for PR 22637 at commit 
[`5d3d0c7`](https://github.com/apache/spark/commit/5d3d0c7b59c46e8c84dbed340a5a75fcdf01c6d3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22637#discussion_r222892548
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 ---
@@ -133,37 +133,38 @@ private FetchResult fetchBlocks(
 
 final Semaphore requestsRemaining = new Semaphore(0);
 
-ExternalShuffleClient client = new ExternalShuffleClient(clientConf, 
null, false, 5000);
-client.init(APP_ID);
-client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
-  new BlockFetchingListener() {
-@Override
-public void onBlockFetchSuccess(String blockId, ManagedBuffer 
data) {
-  synchronized (this) {
-if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-  data.retain();
-  res.successBlocks.add(blockId);
-  res.buffers.add(data);
-  requestsRemaining.release();
+try (ExternalShuffleClient client = new 
ExternalShuffleClient(clientConf, null, false, 5000)) {
+  client.init(APP_ID);
+  client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
+new BlockFetchingListener() {
+  @Override
+  public void onBlockFetchSuccess(String blockId, ManagedBuffer 
data) {
+synchronized (this) {
+  if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+data.retain();
+res.successBlocks.add(blockId);
+res.buffers.add(data);
+requestsRemaining.release();
+  }
 }
   }
-}
-
-@Override
-public void onBlockFetchFailure(String blockId, Throwable 
exception) {
-  synchronized (this) {
-if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-  res.failedBlocks.add(blockId);
-  requestsRemaining.release();
+
+  @Override
+  public void onBlockFetchFailure(String blockId, Throwable 
exception) {
+synchronized (this) {
+  if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+res.failedBlocks.add(blockId);
+requestsRemaining.release();
+  }
 }
   }
-}
-  }, null);
+}, null
+  );
--- End diff --

Nit: move this onto the previous line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22637#discussion_r222892759
  
--- Diff: 
sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java 
---
@@ -154,6 +154,7 @@ public synchronized void start() {
   throw new ServiceException("Unable to connect to MetaStore!", e);
 }
 finally {
+  // IMetaStoreClient is not AutoCloseable, closing it manually
--- End diff --

I wouldn't bother with this. This is a copy of some Hive code anyway


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22047: [SPARK-19851] Add support for EVERY and ANY (SOME) aggre...

2018-10-04 Thread dilipbiswal
Github user dilipbiswal commented on the issue:

https://github.com/apache/spark/pull/22047
  
@gatorsmile First of all, thank you very much . Actually the added 
aggregates weren't null filtering. I have fixed the issue and have added 
additional test cases. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22637
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...

2018-10-04 Thread dilipbiswal
Github user dilipbiswal commented on the issue:

https://github.com/apache/spark/pull/22619
  
@HyukjinKwon Does this look okay now ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22594: [MINOR][SQL] When batch reading, the number of bytes can...

2018-10-04 Thread 10110346
Github user 10110346 commented on the issue:

https://github.com/apache/spark/pull/22594
  
@srowen Yes,I will update,thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22637
  
mind filling PR description please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22637
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: Spark 25408

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22637
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/spark/pull/22399
  
@HyukjinKwon I've opened a new PR under 
https://github.com/apache/spark/pull/22637. Would be nice if you can trigger 
Travis 👍 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22637: Spark 25408

2018-10-04 Thread Fokko
GitHub user Fokko opened a pull request:

https://github.com/apache/spark/pull/22637

Spark 25408

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Fokko/spark SPARK-25408

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22637.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22637


commit d8d7c890f3d6914e47fc613f19415121809f4115
Author: Fokko Driesprong 
Date:   2018-09-11T20:37:33Z

[SPARK-25408] Move to mode ideomatic Java8

Use features from Java8 such as:
- Try-with-resource blocks

commit 99fd3a5af2c4fddc752b5b9c8725c4b68b2c9dbc
Author: Fokko Driesprong 
Date:   2018-10-05T04:58:37Z

[SPARK-25408] Move to mode ideomatic Java8

Fix the un AutoCloseable operations




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-04 Thread sandeep-katta
Github user sandeep-katta commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r222891761
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -351,7 +351,7 @@ def tearDown(self):
 super(SQLTests, self).tearDown()
 
 # tear down test_bucketed_write state
-self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")
+self.spark.sql("DROP DATABASE IF EXISTS some_db CASCADE")
--- End diff --


test_current_database,test_list_databases,test_list_tables,test_list_functions 
and test_list_columns all these test case create the database and does not drop 
it,so the folder will be present which result in the test case failures


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-04 Thread sandeep-katta
Github user sandeep-katta commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r222891525
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -407,6 +407,7 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 
   test("create a managed table with the existing non-empty directory") {
 withTable("tab1") {
+  Utils.createDirectory(spark.sessionState.conf.warehousePath)
--- End diff --

As per line 414 it is trying to create empty directory under warehouse 
path,this statement will fail as parent folder(ware house) does not exists


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22500: [SPARK-25488][TEST] Refactor MiscBenchmark to use main m...

2018-10-04 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/22500
  
@dongjoon-hyun Is this ready to go?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222890990
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
--- End diff --

added doc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222890961
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
--- End diff --

Since `deleteRecursively` can raise `IOException`, can we have `try ... 
finally ...` to ensure `spark.stop` invocation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222890547
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+new ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("assertResourceTypeValue() should 
not be invoked " +
+"since yarn resource types is not available because of old Hadoop 
version!")
+}
+
+val getResourceInformationMethod = 
res.getClass.getMethod("getResourceInformation",
+  classOf[String])
+val resourceInformation = getResourceInformationMethod.invoke(res, 
name)
+resourceInformation
+  }
+
+  private def invokeMethod(resourceInformation: AnyRef, methodName: 
String): AnyRef = {
+val getValueMethod = resourceInformation.getClass.getMethod(methodName)
+getValueMethod.invoke(resourceInformation)
+  }
+
+  class ResourceInformation(val name: String, val value: Long, val units: 
String) {
--- End diff --

Thanks for pointing this, this is indeed more readable and testable.


---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...

2018-10-04 Thread AbdealiJK
Github user AbdealiJK commented on a diff in the pull request:

https://github.com/apache/spark/pull/22635#discussion_r222890103
  
--- Diff: python/pyspark/accumulators.py ---
@@ -109,10 +109,14 @@
 
 def _deserialize_accumulator(aid, zero_value, accum_param):
 from pyspark.accumulators import _accumulatorRegistry
-accum = Accumulator(aid, zero_value, accum_param)
-accum._deserialized = True
-_accumulatorRegistry[aid] = accum
-return accum
+# If this certain accumulator was deserialized, don't overwrite it.
+if aid in _accumulatorRegistry:
--- End diff --

Should it be `if aid in _accumulatorRegistry and 
_accumulatorRegistry[aid]._deserialized is True`
or:
```
if aid in _accumulatorRegistry:
_accumulatorRegistry[aid]._deserialize = True
return _accumulatorRegistry[aid]
```
To make double sure that this function always returns a deserialize version 
of the accum ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22399
  
Let me trigger it in the next PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222889602
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestValidatorSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class ResourceRequestValidatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "3G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.executor.cores", "2")
+
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("Memory defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.memory", "30G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include 
("spark.yarn.executor.resource.memory")
+  }
+
+  test("Cores defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.cores", "5")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.executor.resource.cores")
+  }
+
+  test("Memory defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.am.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.memory")
+  }
+
+  test("Memory defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.driver.resource.memory")
+  }
+
+  test("Cores defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "2G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.yarn.am.cores", "2")
+
+sparkConf.set("spark.yarn.am.resource.cores", "3")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.cores")
+  }
+
+  test("Cores defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.cores", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.driver.resource.cores")
+  }
+
+  test("various duplicated definitions") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "2G")
--- End diff --

I searched for references in existing test code and found out that at least 
`"spark.driver.memory"` and `spark.executor.memory` are used in 

[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/spark/pull/22399
  
The tests passed earlier, how would it be possible that it would fail on 
master?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222889144
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+new ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
--- End diff --

In what way I could refer to the `ResourceInformation` class?
Remember, that class either present or not present on the classpath 
depending on what version of Hadoop Spark depends on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22399
  
@Fokko, Let's follow the PR format next time BTW, for instance, "How was 
this patch tested?"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/22399
  
It wasn't a merge conflict. I failed to notice this had not actually been 
tested. To check locally you'd have to make sure you enable more profiles like 
Hive to build some of the code that changed here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888986
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
--- End diff --

Fixed the name and removed Logging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222888557
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

@zsxwing . Could you fix the indentation of Apache License?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888518
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888391
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
--- End diff --

fixed the namings in this class


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/spark/pull/22399
  
Thanks @srowen for pointing out the errors. Weird that it did not come up 
as a merge conflict. Let me open a new PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887537
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -35,13 +36,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers {
+class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
--- End diff --

I guess I wanted to use it before then I forgot to remote this later. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887404
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"memory"),
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887435
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"))
+val errorMessage = new mutable.StringBuilder()
+
+resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+  if (sparkConf.contains(resourceRequest)) {
+errorMessage.append(s"$ERROR_PREFIX Do not use $resourceRequest, " 
+
+s"please use $sparkName instead!\n")
+  }
+}
+
+// throw exception after loop
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22634
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22634
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96959/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886705
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886619
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22634: [SPARK-25646][k8s] Fix docker-image-tool.sh on dev build...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22634
  
**[Test build #96959 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96959/testReport)**
 for PR 22634 at commit 
[`0cdd4e5`](https://github.com/apache/spark/commit/0cdd4e577da55e93a029dbe4a34b5999802360d5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886571
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+resources.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+}
+logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
+val resourceInformation =
+  createResourceInformation(name, amount, unit, resInfoClass)
+setResourceInformationMethod.invoke(
+  resource, name, resourceInformation.asInstanceOf[AnyRef])
+  } catch {
+case _: MatchError => throw new IllegalArgumentException(
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885910
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

maybe I am concerning too much .. but how about just say .. 

```
... defined returnType (see :meth:`types.to_arrow_type` and 
:meth:`types.from_arrow_type`). 
When there is mismatch between them, the conversion is not guaranteed.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885267
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

Yes .. I support to just fix the doc first here only and make a PR 
separately later if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/7
  
**[Test build #96966 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96966/testReport)**
 for PR 7 at commit 
[`34ba74f`](https://github.com/apache/spark/commit/34ba74f79aad2a0e2fe9e0d6f6110a10a51c8108).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/7
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22636
  
**[Test build #96965 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96965/testReport)**
 for PR 22636 at commit 
[`e40d79c`](https://github.com/apache/spark/commit/e40d79ce33941800408a0697e433ea4d0b20b3b5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22636
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3697/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22636
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22594: [MINOR][SQL] When batch reading, the number of by...

2018-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22594#discussion_r222884751
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -104,12 +104,14 @@ class FileScanRDD(
 val nextElement = currentIterator.next()
 // TODO: we should have a better separation of row based and batch 
based scan, so that we
 // don't need to run this `if` for every record.
+val preNumRecordsRead = inputMetrics.recordsRead
 if (nextElement.isInstanceOf[ColumnarBatch]) {
   
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
 } else {
   inputMetrics.incRecordsRead(1)
 }
-if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
--- End diff --

I think the issue is that in line 108, this value can be incremented by 
more than 1. It might skip over the count that is an exact multiple of 
`UPDATE_INPUT_METRICS_INTERVAL_RECORDS`. If that code path is common, it might 
rarely ever get updated. This now just checks whether the increment causes the 
value to exceed a higher multiple of `UPDATE_INPUT_METRICS_INTERVAL_RECORDS`, 
which sounds more correct. But yeah needs a description and ideally a little 
test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...

2018-10-04 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/22636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/22399
  
Sorry @Fokko can you recreate this change in a new PR, after addressing...
```
[error] 
/home/jenkins/workspace/spark-master-compile-maven-hadoop-2.6/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java:149:
 error: incompatible types: try-with-resources not applicable to variable type
[error] try (IMetaStoreClient metastoreClient = new 
HiveMetaStoreClient(hiveConf)) {
[error]   ^
[error] (IMetaStoreClient cannot be converted to AutoCloseable)
[error] 
/home/jenkins/workspace/spark-master-compile-maven-hadoop-2.6/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java:200:
 error: incompatible types: try-with-resources not applicable to variable type
[error] try (Operation operation = removeOperation(opHandle)) {
[error]^
[error] (Operation cannot be converted to AutoCloseable)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/22399
  
Oh no, I got mixed up and didn't realize this one hadn't actually had a 
jenkins run. Sorry about this, that is my mistake. I will follow up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22622
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22622
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96957/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22622
  
**[Test build #96957 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96957/testReport)**
 for PR 22622 at commit 
[`65ac786`](https://github.com/apache/spark/commit/65ac7869188e93473a7fd7f43575b728207ff218).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22622
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3696/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22622
  
**[Test build #96964 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96964/testReport)**
 for PR 22622 at commit 
[`70016e4`](https://github.com/apache/spark/commit/70016e4896a42315e82141ac995bf12eded07f51).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22622
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/22622
  
Thank you, @HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >