svn commit: r26710 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_20_01-47b5b68-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-05-04 Thread pwendell
Author: pwendell
Date: Sat May  5 03:15:34 2018
New Revision: 26710

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_05_04_20_01-47b5b68 docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.

2018-05-04 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master af4dc5028 -> 47b5b6852


[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming 
aggregation and deduplication.

## What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some 
SparkPlan requires running another batch to output results based on updated 
watermark / processing time. In this PR, I have enabled streaming aggregations 
and streaming deduplicates to automatically run addition batch even if new data 
is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more 
context.

Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in 
MicroBatchExecution control flow was always (at least to me) was that 
`populateStartOffsets` internally called `constructNextBatch` which was not 
obvious from just the name "populateStartOffsets" and made the control flow 
from the main trigger execution loop very confusing (main loop in 
`runActivatedStream` called `constructNextBatch` but only if 
`populateStartOffsets` hadn't already called it). Instead, the refactoring 
makes it cleaner.
- `populateStartOffsets` only the updates `availableOffsets` and 
`committedOffsets`. Does not call `constructNextBatch`.
- Main loop in `runActivatedStream` calls `constructNextBatch` which 
returns true or false reflecting whether the next batch is ready for executing. 
This method is now idempotent; if a batch has already been constructed, then it 
will always return true until the batch has been executed.
- If next batch is ready then we call `runBatch` or sleep.
- That's it.

- Refactoring watermark management logic - This has been refactored out from 
`MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.

- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns 
true if there is any stateful operation in the last execution plan that 
requires another batch for state cleanup, etc. This is used to decide whether 
to construct a batch or not in `constructNextBatch`.

- Changes to stream testing framework - Many tests used CheckLastBatch to 
validate answers. This assumed that there will be no more batches after the 
last set of input has been processed, so the last batch is the one that has 
output corresponding to the last input. This is not true anymore. To account 
for that, I made two changes.
- `CheckNewAnswer` is a new test action that verifies the new rows 
generated since the last time the answer was checked by `CheckAnswer`, 
`CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches 
occurred between the last check and now. To do make this easier, I added a 
common trait between MemorySink and MemorySinkV2 to abstract out some common 
methods.
- `assertNumStateRows` has been updated in the same way to be agnostic to 
batches while checking what the total rows and how many state rows were updated 
(sums up updates since the last check).

## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the 
following patterns.
- Tests where the last input was given again to force another batch to be 
executed and state cleaned up / output generated, they were simplified by 
removing the extra input.
- Tests using aggregation+watermark where CheckLastBatch were replaced with 
CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation and 
deduplication

Author: Tathagata Das 

Closes #21220 from tdas/SPARK-24157.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47b5b685
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47b5b685
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47b5b685

Branch: refs/heads/master
Commit: 47b5b68528c154d32b3f40f388918836d29462b8
Parents: af4dc50
Author: Tathagata Das 
Authored: Fri May 4 16:35:24 2018 -0700
Committer: Tathagata Das 
Committed: Fri May 4 16:35:24 2018 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 +
 .../streaming/IncrementalExecution.scala|  10 +
 .../streaming/MicroBatchExecution.scala | 231 +++
 .../execution/streaming/WatermarkTracker.scala  |  73 +
 .../spark/sql/execution/streaming/memory.scala  |  17 +-
 .../execution/streaming/sources/memoryV2.scala  |   8 +-
 .../execution/streaming/statefulOperators.scala |  16 +
 .../streaming/sources/ForeachWriterSuite.scala  |   8 +-
 .../spark/sql/streaming/DeduplicateSuite.scala  | 285 --
 .../sql/streaming/EventTimeWatermarkSuite.scala | 112 +++
 .../sql/streaming/FileStreamSinkSuite.scala |   7 +-
 

spark git commit: [SPARK-24039][SS] Do continuous processing writes with multiple compute() calls

2018-05-04 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master d04806a23 -> af4dc5028


[SPARK-24039][SS] Do continuous processing writes with multiple compute() calls

## What changes were proposed in this pull request?

Do continuous processing writes with multiple compute() calls.

The current strategy (before this PR) is hacky; we just call next() on an 
iterator which has already returned hasNext = false, knowing that all the nodes 
we whitelist handle this properly. This will have to be changed before we can 
support more complex query plans. (In particular, I have a WIP 
https://github.com/jose-torres/spark/pull/13 which should be able to support 
aggregates in a single partition with minimal additional work.)

Most of the changes here are just refactoring to accommodate the new model. The 
behavioral changes are:

* The writer now calls prev.compute(split, context) once per epoch within the 
epoch loop.
* ContinuousDataSourceRDD now spawns a ContinuousQueuedDataReader which is 
shared across multiple calls to compute() for the same partition.

## How was this patch tested?

existing unit tests

Author: Jose Torres 

Closes #21200 from jose-torres/noAggr.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af4dc502
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af4dc502
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af4dc502

Branch: refs/heads/master
Commit: af4dc50280ffcdeda208ef2dc5f8b843389732e5
Parents: d04806a
Author: Jose Torres 
Authored: Fri May 4 14:14:40 2018 -0700
Committer: Tathagata Das 
Committed: Fri May 4 14:14:40 2018 -0700

--
 .../datasources/v2/DataSourceV2ScanExec.scala   |   6 +-
 .../continuous/ContinuousDataSourceRDD.scala| 114 ++
 .../ContinuousDataSourceRDDIter.scala   | 222 ---
 .../continuous/ContinuousQueuedDataReader.scala | 211 ++
 .../continuous/ContinuousWriteRDD.scala |  90 
 .../WriteToContinuousDataSourceExec.scala   |  57 +
 .../ContinuousQueuedDataReaderSuite.scala   | 167 ++
 7 files changed, 592 insertions(+), 275 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/af4dc502/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 41bdda4..77cb707 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -96,7 +96,11 @@ case class DataSourceV2ScanExec(
   
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   sparkContext.env)
 .askSync[Unit](SetReaderPartitions(readerFactories.size))
-  new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories)
+  new ContinuousDataSourceRDD(
+sparkContext,
+sqlContext.conf.continuousStreamingExecutorQueueSize,
+sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
+readerFactories)
 .asInstanceOf[RDD[InternalRow]]
 
 case r: SupportsScanColumnarBatch if r.enableBatchRead() =>

http://git-wip-us.apache.org/repos/asf/spark/blob/af4dc502/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
new file mode 100644
index 000..0a3b9dc
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed 

spark git commit: [SPARK-24124] Spark history server should create spark.history.store.…

2018-05-04 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 4d5de4d30 -> d04806a23


[SPARK-24124] Spark history server should create spark.history.store.…

…path and set permissions properly

## What changes were proposed in this pull request?

Spark history server should create spark.history.store.path and set permissions 
properly. Note createdDirectories doesn't do anything if the directories are 
already created.  This does not stomp on the permissions if the user had 
manually created the directory before the history server could.

## How was this patch tested?

Manually tested in a 100 node cluster. Ensured directories created with proper 
permissions. Ensured restarted worked apps/temp directories worked as apps were 
read.

Author: Thomas Graves 

Closes #21234 from tgravescs/SPARK-24124.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d04806a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d04806a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d04806a2

Branch: refs/heads/master
Commit: d04806a23c1843a7f0dcc4fa236ed1b40ae113a5
Parents: 4d5de4d
Author: Thomas Graves 
Authored: Fri May 4 13:29:47 2018 -0700
Committer: Marcelo Vanzin 
Committed: Fri May 4 13:29:47 2018 -0700

--
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala  | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d04806a2/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 56db935..bf1eeb0 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.deploy.history
 
 import java.io.{File, FileNotFoundException, IOException}
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
 import java.util.{Date, ServiceLoader}
 import java.util.concurrent.{ExecutorService, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -130,8 +132,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   // Visible for testing.
   private[history] val listing: KVStore = storePath.map { path =>
-require(path.isDirectory(), s"Configured store directory ($path) does not 
exist.")
-val dbPath = new File(path, "listing.ldb")
+val perms = PosixFilePermissions.fromString("rwx--")
+val dbPath = Files.createDirectories(new File(path, 
"listing.ldb").toPath(),
+  PosixFilePermissions.asFileAttribute(perms)).toFile()
+
 val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
   AppStatusStore.CURRENT_VERSION, logDir.toString())
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26705 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_08_01-4d5de4d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-05-04 Thread pwendell
Author: pwendell
Date: Fri May  4 15:16:17 2018
New Revision: 26705

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_05_04_08_01-4d5de4d docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26702 - in /dev/spark/2.3.1-SNAPSHOT-2018_05_04_06_01-3f78f60-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-05-04 Thread pwendell
Author: pwendell
Date: Fri May  4 13:16:16 2018
New Revision: 26702

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_05_04_06_01-3f78f60 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

2018-05-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d51c6aaeb -> a42dd008c


[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. 
This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if 
`AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in 
`LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a42dd008
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a42dd008
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a42dd008

Branch: refs/heads/branch-2.0
Commit: a42dd008c7770c48e62e6c1ad12f191c65c0cbd7
Parents: d51c6aa
Author: Wenchen Fan 
Authored: Fri May 4 19:20:15 2018 +0800
Committer: Wenchen Fan 
Committed: Fri May 4 19:22:20 2018 +0800

--
 .../org/apache/spark/util/AccumulatorV2.scala|  6 --
 .../apache/spark/util/AccumulatorV2Suite.scala   | 19 +++
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a42dd008/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index d06ab3d..155ea6c 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -476,7 +476,9 @@ class LegacyAccumulatorWrapper[R, T](
 param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, 
R] {
   private[spark] var _value = initialValue  // Current value on driver
 
-  override def isZero: Boolean = _value == param.zero(initialValue)
+  @transient private lazy val _zero = param.zero(initialValue)
+
+  override def isZero: Boolean = 
_value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])
 
   override def copy(): LegacyAccumulatorWrapper[R, T] = {
 val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -485,7 +487,7 @@ class LegacyAccumulatorWrapper[R, T](
   }
 
   override def reset(): Unit = {
-_value = param.zero(initialValue)
+_value = _zero
   }
 
   override def add(v: T): Unit = _value = param.addAccumulator(_value, v)

http://git-wip-us.apache.org/repos/asf/spark/blob/a42dd008/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala 
b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
index a04644d..fe0a9a4 100644
--- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark._
+import org.apache.spark.serializer.JavaSerializer
 
 class AccumulatorV2Suite extends SparkFunSuite {
 
@@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
 assert(acc3.isZero)
 assert(acc3.value === "")
   }
+
+  test("LegacyAccumulatorWrapper with AccumulatorParam that has no 
equals/hashCode") {
+class MyData(val i: Int) extends Serializable
+val param = new AccumulatorParam[MyData] {
+  override def zero(initialValue: MyData): MyData = new MyData(0)
+  override def addInPlace(r1: MyData, r2: MyData): MyData = new 
MyData(r1.i + r2.i)
+}
+
+val acc = new LegacyAccumulatorWrapper(new MyData(0), param)
+acc.metadata = AccumulatorMetadata(
+  AccumulatorContext.newId(),
+  Some("test"),
+  countFailedValues = false)
+AccumulatorContext.register(acc)
+
+val ser = new JavaSerializer(new SparkConf).newInstance()
+ser.serialize(acc)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

2018-05-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 768d0b7ce -> 866270ea5


[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. 
This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if 
`AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in 
`LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/866270ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/866270ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/866270ea

Branch: refs/heads/branch-2.2
Commit: 866270ea5bbf72328c1f28c6bf49e39fb3e69d12
Parents: 768d0b7
Author: Wenchen Fan 
Authored: Fri May 4 19:20:15 2018 +0800
Committer: Wenchen Fan 
Committed: Fri May 4 19:21:16 2018 +0800

--
 .../org/apache/spark/util/AccumulatorV2.scala|  6 --
 .../apache/spark/util/AccumulatorV2Suite.scala   | 19 +++
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/866270ea/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 603c23a..5df17cc 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -482,7 +482,9 @@ class LegacyAccumulatorWrapper[R, T](
 param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, 
R] {
   private[spark] var _value = initialValue  // Current value on driver
 
-  override def isZero: Boolean = _value == param.zero(initialValue)
+  @transient private lazy val _zero = param.zero(initialValue)
+
+  override def isZero: Boolean = 
_value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])
 
   override def copy(): LegacyAccumulatorWrapper[R, T] = {
 val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -491,7 +493,7 @@ class LegacyAccumulatorWrapper[R, T](
   }
 
   override def reset(): Unit = {
-_value = param.zero(initialValue)
+_value = _zero
   }
 
   override def add(v: T): Unit = _value = param.addAccumulator(_value, v)

http://git-wip-us.apache.org/repos/asf/spark/blob/866270ea/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala 
b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
index a04644d..fe0a9a4 100644
--- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark._
+import org.apache.spark.serializer.JavaSerializer
 
 class AccumulatorV2Suite extends SparkFunSuite {
 
@@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
 assert(acc3.isZero)
 assert(acc3.value === "")
   }
+
+  test("LegacyAccumulatorWrapper with AccumulatorParam that has no 
equals/hashCode") {
+class MyData(val i: Int) extends Serializable
+val param = new AccumulatorParam[MyData] {
+  override def zero(initialValue: MyData): MyData = new MyData(0)
+  override def addInPlace(r1: MyData, r2: MyData): MyData = new 
MyData(r1.i + r2.i)
+}
+
+val acc = new LegacyAccumulatorWrapper(new MyData(0), param)
+acc.metadata = AccumulatorMetadata(
+  AccumulatorContext.newId(),
+  Some("test"),
+  countFailedValues = false)
+AccumulatorContext.register(acc)
+
+val ser = new JavaSerializer(new SparkConf).newInstance()
+ser.serialize(acc)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

2018-05-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d35eb2f9b -> 3f78f60cc


[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. 
This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if 
`AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in 
`LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f78f60c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f78f60c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f78f60c

Branch: refs/heads/branch-2.3
Commit: 3f78f60cc80d04ce762bd332dd2fa5f7d17c2b3c
Parents: d35eb2f
Author: Wenchen Fan 
Authored: Fri May 4 19:20:15 2018 +0800
Committer: Wenchen Fan 
Committed: Fri May 4 19:20:39 2018 +0800

--
 .../org/apache/spark/util/AccumulatorV2.scala|  6 --
 .../apache/spark/util/AccumulatorV2Suite.scala   | 19 +++
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f78f60c/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0f84ea9..2bc8495 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T](
 param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, 
R] {
   private[spark] var _value = initialValue  // Current value on driver
 
-  override def isZero: Boolean = _value == param.zero(initialValue)
+  @transient private lazy val _zero = param.zero(initialValue)
+
+  override def isZero: Boolean = 
_value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])
 
   override def copy(): LegacyAccumulatorWrapper[R, T] = {
 val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T](
   }
 
   override def reset(): Unit = {
-_value = param.zero(initialValue)
+_value = _zero
   }
 
   override def add(v: T): Unit = _value = param.addAccumulator(_value, v)

http://git-wip-us.apache.org/repos/asf/spark/blob/3f78f60c/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala 
b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
index a04644d..fe0a9a4 100644
--- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark._
+import org.apache.spark.serializer.JavaSerializer
 
 class AccumulatorV2Suite extends SparkFunSuite {
 
@@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
 assert(acc3.isZero)
 assert(acc3.value === "")
   }
+
+  test("LegacyAccumulatorWrapper with AccumulatorParam that has no 
equals/hashCode") {
+class MyData(val i: Int) extends Serializable
+val param = new AccumulatorParam[MyData] {
+  override def zero(initialValue: MyData): MyData = new MyData(0)
+  override def addInPlace(r1: MyData, r2: MyData): MyData = new 
MyData(r1.i + r2.i)
+}
+
+val acc = new LegacyAccumulatorWrapper(new MyData(0), param)
+acc.metadata = AccumulatorMetadata(
+  AccumulatorContext.newId(),
+  Some("test"),
+  countFailedValues = false)
+AccumulatorContext.register(acc)
+
+val ser = new JavaSerializer(new SparkConf).newInstance()
+ser.serialize(acc)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

2018-05-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 7f1b6b182 -> 4d5de4d30


[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. 
This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if 
`AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in 
`LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #21229 from cloud-fan/accumulator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d5de4d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d5de4d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d5de4d3

Branch: refs/heads/master
Commit: 4d5de4d303a773b1c18c350072344bd7efca9fc4
Parents: 7f1b6b1
Author: Wenchen Fan 
Authored: Fri May 4 19:20:15 2018 +0800
Committer: Wenchen Fan 
Committed: Fri May 4 19:20:15 2018 +0800

--
 .../org/apache/spark/util/AccumulatorV2.scala|  6 --
 .../apache/spark/util/AccumulatorV2Suite.scala   | 19 +++
 2 files changed, 23 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d5de4d3/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0f84ea9..2bc8495 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T](
 param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, 
R] {
   private[spark] var _value = initialValue  // Current value on driver
 
-  override def isZero: Boolean = _value == param.zero(initialValue)
+  @transient private lazy val _zero = param.zero(initialValue)
+
+  override def isZero: Boolean = 
_value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])
 
   override def copy(): LegacyAccumulatorWrapper[R, T] = {
 val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T](
   }
 
   override def reset(): Unit = {
-_value = param.zero(initialValue)
+_value = _zero
   }
 
   override def add(v: T): Unit = _value = param.addAccumulator(_value, v)

http://git-wip-us.apache.org/repos/asf/spark/blob/4d5de4d3/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala 
b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
index a04644d..fe0a9a4 100644
--- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark._
+import org.apache.spark.serializer.JavaSerializer
 
 class AccumulatorV2Suite extends SparkFunSuite {
 
@@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
 assert(acc3.isZero)
 assert(acc3.value === "")
   }
+
+  test("LegacyAccumulatorWrapper with AccumulatorParam that has no 
equals/hashCode") {
+class MyData(val i: Int) extends Serializable
+val param = new AccumulatorParam[MyData] {
+  override def zero(initialValue: MyData): MyData = new MyData(0)
+  override def addInPlace(r1: MyData, r2: MyData): MyData = new 
MyData(r1.i + r2.i)
+}
+
+val acc = new LegacyAccumulatorWrapper(new MyData(0), param)
+acc.metadata = AccumulatorMetadata(
+  AccumulatorContext.newId(),
+  Some("test"),
+  countFailedValues = false)
+AccumulatorContext.register(acc)
+
+val ser = new JavaSerializer(new SparkConf).newInstance()
+ser.serialize(acc)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26700 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_04_01-7f1b6b1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-05-04 Thread pwendell
Author: pwendell
Date: Fri May  4 11:18:58 2018
New Revision: 26700

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_05_04_04_01-7f1b6b1 docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is available

2018-05-04 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 0c23e254c -> 7f1b6b182


[SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is 
available

## What changes were proposed in this pull request?

Avoid unnecessary sleep (10 ms) in each invocation of 
MemoryStreamDataReader.next.

## How was this patch tested?

Ran ContinuousSuite from IDE.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Arun Mahadevan 

Closes #21207 from arunmahadevan/memorystream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f1b6b18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f1b6b18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f1b6b18

Branch: refs/heads/master
Commit: 7f1b6b182e3cf3cbf29399e7bfbe03fa869e0bc8
Parents: 0c23e25
Author: Arun Mahadevan 
Authored: Fri May 4 16:02:21 2018 +0800
Committer: jerryshao 
Committed: Fri May 4 16:02:21 2018 +0800

--
 .../streaming/sources/ContinuousMemoryStream.scala  | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f1b6b18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index c28919b..a8fca3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -183,11 +183,10 @@ class ContinuousMemoryStreamDataReader(
   private var current: Option[Row] = None
 
   override def next(): Boolean = {
-current = None
+current = getRecord
 while (current.isEmpty) {
   Thread.sleep(10)
-  current = endpoint.askSync[Option[Row]](
-  GetRecord(ContinuousMemoryStreamPartitionOffset(partition, 
currentOffset)))
+  current = getRecord
 }
 currentOffset += 1
 true
@@ -199,6 +198,10 @@ class ContinuousMemoryStreamDataReader(
 
   override def getOffset: ContinuousMemoryStreamPartitionOffset =
 ContinuousMemoryStreamPartitionOffset(partition, currentOffset)
+
+  private def getRecord: Option[Row] =
+endpoint.askSync[Option[Row]](
+  GetRecord(ContinuousMemoryStreamPartitionOffset(partition, 
currentOffset)))
 }
 
 case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int])


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org