[spark] branch master updated: [SPARK-26651][SQL][DOC] Collapse notes related to java.time API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b85974d [SPARK-26651][SQL][DOC] Collapse notes related to java.time API b85974d is described below commit b85974db85a881a2e8aebc31cb4f578008648ab9 Author: Maxim Gekk AuthorDate: Sat Feb 2 11:17:33 2019 +0800 [SPARK-26651][SQL][DOC] Collapse notes related to java.time API ## What changes were proposed in this pull request? Collapsed notes about using Java 8 API for date/timestamp manipulations and Proleptic Gregorian calendar in the SQL migration guide. Closes #23722 from MaxGekk/collapse-notes. Authored-by: Maxim Gekk Signed-off-by: Hyukjin Kwon --- docs/sql-migration-guide-upgrade.md | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 41f27a3..dbf9df0 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,14 +31,10 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.setCommandRejectsSparkCoreConfs` to `false`. - - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpose with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback [...] - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. - - Since Spark 3.0, the `unix_timestamp`, `date_format`, `to_unix_timestamp`, `from_unixtime`, `to_date`, `to_timestamp` functions use java.time API for parsing and formatting dates/timestamps from/to strings by using ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html) based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, java.text.SimpleDateFormat and java.util.GregorianCalendar (hybrid calendar that supports both the Julian [...] - - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - In PySpark, when Arrow optimization is enabled, if Arrow version is higher than 0.11.0, Arrow can perform safe type conversion when converting Pandas.Series to Arrow array during serialization. Arrow will raise errors when detecting unsafe type conversion like overflow. Setting `spark.sql.execution.pandas.arrowSafeTypeConversion` to true can enable it. The default setting is false. PySpark's behavior for Arrow versions is illustrated in the table below: @@ -91,11 +87,15 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is int [...] - - Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous versions can be
[spark] branch master updated: [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75ea89a [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 75ea89a is described below commit 75ea89ad94ca76646e4697cf98c78d14c6e2695f Author: Boris Shminke AuthorDate: Sat Feb 2 10:49:45 2019 +0800 [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 ## What changes were proposed in this pull request? In this PR we've done two things: 1) updated the Spark's copy of cloudpickle to 0.6.1 (current stable) The main reason Spark stayed with cloudpickle 0.4.x was that the default pickle protocol was changed in later versions. 2) started using pickle.HIGHEST_PROTOCOL for both Python 2 and Python 3 for serializers and broadcast [Pyrolite](https://github.com/irmen/Pyrolite) has such Pickle protocol version support: reading: 0,1,2,3,4; writing: 2. ## How was this patch tested? Jenkins tests. Authors: Sloane Simmons, Boris Shminke This contribution is original work of Sloane Simmons and Boris Shminke and they licensed it to the project under the project's open source license. Closes #20691 from inpefess/pickle_protocol_4. Lead-authored-by: Boris Shminke Co-authored-by: singularperturbation Signed-off-by: Hyukjin Kwon --- python/pyspark/broadcast.py | 4 +- python/pyspark/cloudpickle.py| 259 --- python/pyspark/serializers.py| 7 +- python/pyspark/tests/test_rdd.py | 2 +- 4 files changed, 194 insertions(+), 78 deletions(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 43a5ead..cca64b5 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -23,7 +23,7 @@ import threading from pyspark.cloudpickle import print_exec from pyspark.java_gateway import local_connect_and_auth -from pyspark.serializers import ChunkedStream +from pyspark.serializers import ChunkedStream, pickle_protocol from pyspark.util import _exception_message if sys.version < '3': @@ -109,7 +109,7 @@ class Broadcast(object): def dump(self, value, f): try: -pickle.dump(value, f, 2) +pickle.dump(value, f, pickle_protocol) except pickle.PickleError: raise except Exception as e: diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 88519d7..bf92569 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -42,20 +42,26 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from __future__ import print_function -import dis -from functools import partial -import imp import io -import itertools -import logging +import dis +import sys +import types import opcode -import operator import pickle import struct -import sys -import traceback -import types +import logging import weakref +import operator +import importlib +import itertools +import traceback +from functools import partial + + +# cloudpickle is meant for inter process communication: we expect all +# communicating processes to run the same Python version hence we favor +# communication speed over compatibility: +DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL if sys.version < '3': @@ -72,6 +78,22 @@ else: PY3 = True +# Container for the global namespace to ensure consistent unpickling of +# functions defined in dynamic modules (modules not registed in sys.modules). +_dynamic_modules_globals = weakref.WeakValueDictionary() + + +class _DynamicModuleFuncGlobals(dict): +"""Global variables referenced by a function defined in a dynamic module + +To avoid leaking references we store such context in a WeakValueDictionary +instance. However instances of python builtin types such as dict cannot +be used directly as values in such a construct, hence the need for a +derived class. +""" +pass + + def _make_cell_set_template_code(): """Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF @@ -157,7 +179,7 @@ def cell_set(cell, value): )(value) -#relevant opcodes +# relevant opcodes STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL'] @@ -167,7 +189,7 @@ EXTENDED_ARG = dis.EXTENDED_ARG def islambda(func): -return getattr(func,'__name__') == '' +return getattr(func, '__name__') == '' _BUILTIN_TYPE_NAMES = {} @@ -248,7 +270,9 @@ class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() def __init__(self, file, protocol=None): -Pickler.__init__(self, file, protocol) +if protocol is None: +protocol = DEFAULT_PROTOCOL +Pickler.__init__(self, file, protocol=protocol) # set of modules to unpi
[spark] branch master updated: [SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a0faabf [SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI a0faabf is described below commit a0faabf7b5901866945a5b4c9ae973de266ed887 Author: xiaodeshan AuthorDate: Fri Feb 1 18:38:27 2019 -0600 [SPARK-26714][CORE][WEBUI] Show 0 partition job in WebUI ## What changes were proposed in this pull request? When the job's partiton is zero, it will still get a jobid but not shown in ui. It's strange. This PR is to show this job in ui. Example: In bash: mkdir -p /home/test/testdir sc.textFile("/home/test/testdir") Some logs: ``` 19/01/24 17:26:19 INFO FileInputFormat: Total input paths to process : 0 19/01/24 17:26:19 INFO SparkContext: Starting job: collect at WordCount.scala:9 19/01/24 17:26:19 INFO DAGScheduler: Job 0 finished: collect at WordCount.scala:9, took 0.003735 s ``` ## How was this patch tested? UT Closes #23637 from deshanxiao/spark-26714. Authored-by: xiaodeshan Signed-off-by: Sean Owen --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 + .../main/scala/org/apache/spark/status/AppStatusListener.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 3 ++- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 +++-- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 75eb37c..dd1b259 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -693,6 +693,11 @@ private[spark] class DAGScheduler( val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { + val time = clock.getTimeMillis() + listenerBus.post( +SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties)) + listenerBus.post( +SparkListenerJobEnd(jobId, time, JobSucceeded)) // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 3089f05..a8b2153 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -321,7 +321,7 @@ private[spark] class AppStatusListener( } val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption -val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") +val jobName = lastStageInfo.map(_.name).getOrElse("") val jobGroup = Option(event.properties) .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } val sqlExecutionId = Option(event.properties) @@ -329,7 +329,7 @@ private[spark] class AppStatusListener( val job = new LiveJob( event.jobId, - lastStageName, + jobName, if (event.time > 0) Some(new Date(event.time)) else None, event.stageIds, jobGroup, diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 9674350..54f2750 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -363,7 +363,8 @@ private[spark] object UIUtils extends Logging { skipped: Int, reasonToNumKilled: Map[String, Int], total: Int): Seq[Node] = { -val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) +val ratio = if (total == 0) 100.0 else (completed.toDouble/total)*100 +val completeWidth = "width: %s%%".format(ratio) // started + completed can be > total when there are speculative tasks val boundedStarted = math.min(started, total - completed) val startWidth = "width: %s%%".format((boundedStarted.toDouble/total)*100) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2c94853..e399f7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -857,8 +857,13 @@ private[spark] object ApiHelper { } def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = { -val stage = store.asOption(store.stageAttempt(job.stageIds.max, 0)._1) -(stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name)
[spark] branch master updated: [MINOR][DOC] Writing to partitioned Hive metastore Parquet tables is not supported for Spark SQL
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 421ff6f [MINOR][DOC] Writing to partitioned Hive metastore Parquet tables is not supported for Spark SQL 421ff6f is described below commit 421ff6f60e2f3da123fd941d9fa91d7228b21ebc Author: liuxian AuthorDate: Fri Feb 1 18:34:13 2019 -0600 [MINOR][DOC] Writing to partitioned Hive metastore Parquet tables is not supported for Spark SQL ## What changes were proposed in this pull request? Even if `spark.sql.hive.convertMetastoreParquet` is true, when writing to partitioned Hive metastore Parquet tables, Spark SQL still can not use its own Parquet support instead of Hive SerDe. Related code: https://github.com/apache/spark/blob/d53e11ffce3f721886918c1cb4525478971f02bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L198 ## How was this patch tested? N/A Closes #23671 from 10110346/parquetdoc. Authored-by: liuxian Signed-off-by: Sean Owen --- docs/sql-data-sources-parquet.md | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 5532bf9..f6e03fba 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -157,9 +157,10 @@ turned it off by default starting from 1.5.0. You may enable it by ### Hive metastore Parquet table conversion -When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own -Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the -`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by default. +When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore +Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for +better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet` +configuration, and is turned on by default. Hive/Parquet Schema Reconciliation - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() consistently non-blocking by default
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8171b156 [SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() consistently non-blocking by default 8171b156 is described below commit 8171b156ebf25a582318c71222716b5c52c8bbb3 Author: Sean Owen AuthorDate: Fri Feb 1 18:29:55 2019 -0600 [SPARK-26771][CORE][GRAPHX] Make .unpersist(), .destroy() consistently non-blocking by default ## What changes were proposed in this pull request? Make .unpersist(), .destroy() non-blocking by default and adjust callers to request blocking only where important. This also adds an optional blocking argument to Pyspark's RDD.unpersist(), which never had one. ## How was this patch tested? Existing tests. Closes #23685 from srowen/SPARK-26771. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/broadcast/Broadcast.scala | 3 +-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 +++--- .../spark/rdd/util/PeriodicRDDCheckpointer.scala | 2 +- .../test/scala/org/apache/spark/DistributedSuite.scala | 2 +- .../test/scala/org/apache/spark/UnpersistSuite.scala | 2 +- .../org/apache/spark/broadcast/BroadcastSuite.scala| 4 ++-- .../scala/org/apache/spark/ui/UISeleniumSuite.scala| 4 ++-- docs/graphx-programming-guide.md | 2 +- docs/rdd-programming-guide.md | 9 - .../spark/examples/mllib/BinaryClassification.scala| 2 +- .../spark/examples/mllib/DecisionTreeRunner.scala | 2 +- .../apache/spark/examples/mllib/LinearRegression.scala | 2 +- .../org/apache/spark/examples/mllib/MovieLensALS.scala | 2 +- .../src/main/scala/org/apache/spark/graphx/Graph.scala | 8 ++-- .../main/scala/org/apache/spark/graphx/Pregel.scala| 6 +++--- .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 4 ++-- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/lib/PageRank.scala | 10 +- .../spark/graphx/lib/StronglyConnectedComponents.scala | 2 +- .../spark/graphx/util/PeriodicGraphCheckpointer.scala | 2 +- .../scala/org/apache/spark/graphx/GraphSuite.scala | 10 +- .../org/apache/spark/ml/classification/LinearSVC.scala | 2 +- .../spark/ml/classification/LogisticRegression.scala | 2 +- .../apache/spark/ml/clustering/GaussianMixture.scala | 6 +++--- .../apache/spark/ml/optim/loss/RDDLossFunction.scala | 2 +- .../spark/ml/regression/AFTSurvivalRegression.scala| 4 ++-- .../apache/spark/ml/regression/LinearRegression.scala | 4 ++-- .../spark/ml/tree/impl/GradientBoostedTrees.scala | 2 +- .../org/apache/spark/ml/tree/impl/NodeIdCache.scala| 6 +++--- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 16 .../spark/mllib/clustering/BisectingKMeans.scala | 8 .../spark/mllib/clustering/GaussianMixture.scala | 2 +- .../org/apache/spark/mllib/clustering/KMeans.scala | 12 ++-- .../apache/spark/mllib/clustering/KMeansModel.scala| 2 +- .../org/apache/spark/mllib/clustering/LDAModel.scala | 2 +- .../apache/spark/mllib/clustering/LDAOptimizer.scala | 2 +- .../org/apache/spark/mllib/feature/Word2Vec.scala | 10 +- .../scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- .../spark/mllib/optimization/GradientDescent.scala | 2 +- .../org/apache/spark/mllib/optimization/LBFGS.scala| 2 +- .../mllib/regression/GeneralizedLinearAlgorithm.scala | 2 +- .../spark/mllib/tree/model/treeEnsembleModels.scala| 2 +- .../spark/ml/classification/LinearSVCSuite.scala | 4 ++-- .../ml/classification/LogisticRegressionSuite.scala| 6 +++--- python/pyspark/broadcast.py| 11 +++ python/pyspark/rdd.py | 8 ++-- python/pyspark/tests/test_rdd.py | 2 +- .../org/apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DatasetCacheSuite.scala | 18 +- .../columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- .../org/apache/spark/sql/hive/CachedTableSuite.scala | 2 +- .../org/apache/spark/streaming/dstream/DStream.scala | 2 +- 55 files changed, 131 insertions(+), 114 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a8b8e96..803703f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTrack
[spark] branch master updated: [SPARK-26754][PYTHON] Add hasTrainingSummary to replace duplicate code in PySpark
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5bb9647 [SPARK-26754][PYTHON] Add hasTrainingSummary to replace duplicate code in PySpark 5bb9647 is described below commit 5bb9647e1019ea7eb17af7d2057fdacb7f4c560b Author: Huaxin Gao AuthorDate: Fri Feb 1 17:29:58 2019 -0600 [SPARK-26754][PYTHON] Add hasTrainingSummary to replace duplicate code in PySpark ## What changes were proposed in this pull request? Python version of https://github.com/apache/spark/pull/17654 ## How was this patch tested? Existing Python unit test Closes #23676 from huaxingao/spark26754. Authored-by: Huaxin Gao Signed-off-by: Sean Owen --- python/pyspark/ml/classification.py | 19 ++- python/pyspark/ml/clustering.py | 37 ++--- python/pyspark/ml/regression.py | 30 ++ python/pyspark/ml/util.py | 26 ++ 4 files changed, 44 insertions(+), 68 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 89b9278..134b9e0 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -483,7 +483,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti return self.getOrDefault(self.upperBoundsOnIntercepts) -class LogisticRegressionModel(JavaModel, JavaClassificationModel, JavaMLWritable, JavaMLReadable): +class LogisticRegressionModel(JavaModel, JavaClassificationModel, JavaMLWritable, JavaMLReadable, + HasTrainingSummary): """ Model fitted by LogisticRegression. @@ -532,24 +533,16 @@ class LogisticRegressionModel(JavaModel, JavaClassificationModel, JavaMLWritable trained on the training set. An exception is thrown if `trainingSummary is None`. """ if self.hasSummary: -java_lrt_summary = self._call_java("summary") if self.numClasses <= 2: -return BinaryLogisticRegressionTrainingSummary(java_lrt_summary) +return BinaryLogisticRegressionTrainingSummary(super(LogisticRegressionModel, + self).summary) else: -return LogisticRegressionTrainingSummary(java_lrt_summary) +return LogisticRegressionTrainingSummary(super(LogisticRegressionModel, + self).summary) else: raise RuntimeError("No training summary available for this %s" % self.__class__.__name__) -@property -@since("2.0.0") -def hasSummary(self): -""" -Indicates whether a training summary exists for this model -instance. -""" -return self._call_java("hasSummary") - @since("2.0.0") def evaluate(self, dataset): """ diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index b9c6bdf..864e2a3 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -97,7 +97,7 @@ class ClusteringSummary(JavaWrapper): return self._call_java("numIter") -class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable): +class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable, HasTrainingSummary): """ Model fitted by GaussianMixture. @@ -126,22 +126,13 @@ class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable): @property @since("2.1.0") -def hasSummary(self): -""" -Indicates whether a training summary exists for this model -instance. -""" -return self._call_java("hasSummary") - -@property -@since("2.1.0") def summary(self): """ Gets summary (e.g. cluster assignments, cluster sizes) of the model trained on the training set. An exception is thrown if no summary exists. """ if self.hasSummary: -return GaussianMixtureSummary(self._call_java("summary")) +return GaussianMixtureSummary(super(GaussianMixtureModel, self).summary) else: raise RuntimeError("No training summary available for this %s" % self.__class__.__name__) @@ -323,7 +314,7 @@ class KMeansSummary(ClusteringSummary): return self._call_java("trainingCost") -class KMeansModel(JavaModel, GeneralJavaMLWritable, JavaMLReadable): +class KMeansModel(JavaModel, GeneralJavaMLWritable, JavaMLReadable, HasTrainingSummary): """ Model fitted by KMeans. @@ -337,21 +328,13 @@ class KMeansModel(JavaModel, GeneralJavaMLWri
[spark] branch test-branch deleted (was 0f8b07e)
This is an automated email from the ASF dual-hosted git repository. rxin pushed a change to branch test-branch in repository https://gitbox.apache.org/repos/asf/spark.git. was 0f8b07e test This change permanently discards the following revisions: discard 0f8b07e test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch test-branch created (now 0f8b07e)
This is an automated email from the ASF dual-hosted git repository. rxin pushed a change to branch test-branch in repository https://gitbox.apache.org/repos/asf/spark.git. at 0f8b07e test This branch includes the following new commits: new 0f8b07e test The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/01: test
This is an automated email from the ASF dual-hosted git repository. rxin pushed a commit to branch test-branch in repository https://gitbox.apache.org/repos/asf/spark.git commit 0f8b07e5034af2819b75b53aadffda82ae0c31b8 Author: Reynold Xin AuthorDate: Fri Feb 1 13:28:18 2019 -0800 test --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 271f2f5..2c1e02a 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ For general development tips, including info on developing Spark using an IDE, s The easiest way to start using Spark is through the Scala shell: -./bin/spark-shell +./bin/spark-shella Try the following command, which should return 1000: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.2 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.2 by this push: new 7c7d7f6 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly 7c7d7f6 is described below commit 7c7d7f6a878b02ece881266ee538f3e1443aa8c1 Author: Shixiong Zhu AuthorDate: Fri Feb 1 11:15:05 2019 -0800 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly ## What changes were proposed in this pull request? Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report: ``` "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } ``` This issue was reported by liancheng . This PR fixes the above issue. ## How was this patch tested? The new unit tests. Closes #23718 from zsxwing/merge-zero. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd) Signed-off-by: Shixiong Zhu --- .../streaming/EventTimeWatermarkExec.scala | 17 +--- .../sql/streaming/EventTimeWatermarkSuite.scala| 32 -- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 55e7508..4069633 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou } def merge(that: EventTimeStats): Unit = { -this.max = math.max(this.max, that.max) -this.min = math.min(this.min, that.min) -this.count += that.count -this.avg += (that.avg - this.avg) * that.count / this.count +if (that.count == 0) { + // no-op +} else if (this.count == 0) { + this.max = that.max + this.min = that.min + this.count = that.count + this.avg = that.avg +} else { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.count += that.count + this.avg += (that.avg - this.avg) * that.count / this.count +} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 4f19fa0..14a193f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche sqlContext.streams.active.foreach(_.stop()) } - test("EventTimeStats") { -val epsilon = 10E-6 + private val epsilon = 10E-6 + test("EventTimeStats") { val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5) stats.add(80L) stats.max should be (100) @@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("EventTimeStats: avg on large values") { -val epsilon = 10E-6 val largeValue = 100L // 10B // Make sure `largeValue` will cause overflow if we use a Long sum to calc avg. assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue)) @@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche stats.avg should be ((largeValue + 0.5) +- epsilon) } + test("EventTimeStats: zero merge zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats should be (EventTimeStats.zero) + } + + test("EventTimeStats: non-zero merge zero") { +val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats.max should be (10L) +stats.min should be (1L) +stats.avg should be (5.0 +- epsilon) +stats.count should be (3L) + } + + test("EventTimeStats: zero merge non-zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +stats.merge(stats2) +
[spark] branch branch-2.3 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new a5d22da [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly a5d22da is described below commit a5d22da1888b8110b490d52d2c36b3fc907254f6 Author: Shixiong Zhu AuthorDate: Fri Feb 1 11:15:05 2019 -0800 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly ## What changes were proposed in this pull request? Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report: ``` "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } ``` This issue was reported by liancheng . This PR fixes the above issue. ## How was this patch tested? The new unit tests. Closes #23718 from zsxwing/merge-zero. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd) Signed-off-by: Shixiong Zhu --- .../streaming/EventTimeWatermarkExec.scala | 17 +--- .../sql/streaming/EventTimeWatermarkSuite.scala| 32 -- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index b161651..6fa7ee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou } def merge(that: EventTimeStats): Unit = { -this.max = math.max(this.max, that.max) -this.min = math.min(this.min, that.min) -this.count += that.count -this.avg += (that.avg - this.avg) * that.count / this.count +if (that.count == 0) { + // no-op +} else if (this.count == 0) { + this.max = that.max + this.min = that.min + this.count = that.count + this.avg = that.avg +} else { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.count += that.count + this.avg += (that.avg - this.avg) * that.count / this.count +} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d6bef9c..a51f086 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche sqlContext.streams.active.foreach(_.stop()) } - test("EventTimeStats") { -val epsilon = 10E-6 + private val epsilon = 10E-6 + test("EventTimeStats") { val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5) stats.add(80L) stats.max should be (100) @@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("EventTimeStats: avg on large values") { -val epsilon = 10E-6 val largeValue = 100L // 10B // Make sure `largeValue` will cause overflow if we use a Long sum to calc avg. assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue)) @@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche stats.avg should be ((largeValue + 0.5) +- epsilon) } + test("EventTimeStats: zero merge zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats should be (EventTimeStats.zero) + } + + test("EventTimeStats: non-zero merge zero") { +val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats.max should be (10L) +stats.min should be (1L) +stats.avg should be (5.0 +- epsilon) +stats.count should be (3L) + } + + test("EventTimeStats: zero merge non-zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +stats.merge(stats2) +
[spark] branch branch-2.4 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bd4ce51 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly bd4ce51 is described below commit bd4ce51e699da306bc36db0c7b0303b6e6c3d4df Author: Shixiong Zhu AuthorDate: Fri Feb 1 11:15:05 2019 -0800 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly ## What changes were proposed in this pull request? Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report: ``` "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } ``` This issue was reported by liancheng . This PR fixes the above issue. ## How was this patch tested? The new unit tests. Closes #23718 from zsxwing/merge-zero. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd) Signed-off-by: Shixiong Zhu --- .../streaming/EventTimeWatermarkExec.scala | 17 +--- .../sql/streaming/EventTimeWatermarkSuite.scala| 32 -- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index b161651..6fa7ee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou } def merge(that: EventTimeStats): Unit = { -this.max = math.max(this.max, that.max) -this.min = math.min(this.min, that.min) -this.count += that.count -this.avg += (that.avg - this.avg) * that.count / this.count +if (that.count == 0) { + // no-op +} else if (this.count == 0) { + this.max = that.max + this.min = that.min + this.count = that.count + this.avg = that.avg +} else { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.count += that.count + this.avg += (that.avg - this.avg) * that.count / this.count +} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 026af17..091b9a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche sqlContext.streams.active.foreach(_.stop()) } - test("EventTimeStats") { -val epsilon = 10E-6 + private val epsilon = 10E-6 + test("EventTimeStats") { val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5) stats.add(80L) stats.max should be (100) @@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("EventTimeStats: avg on large values") { -val epsilon = 10E-6 val largeValue = 100L // 10B // Make sure `largeValue` will cause overflow if we use a Long sum to calc avg. assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue)) @@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche stats.avg should be ((largeValue + 0.5) +- epsilon) } + test("EventTimeStats: zero merge zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats should be (EventTimeStats.zero) + } + + test("EventTimeStats: non-zero merge zero") { +val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats.max should be (10L) +stats.min should be (1L) +stats.avg should be (5.0 +- epsilon) +stats.count should be (3L) + } + + test("EventTimeStats: zero merge non-zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +stats.merge(stats2) +
[spark] branch master updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 03a928c [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly 03a928c is described below commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd Author: Shixiong Zhu AuthorDate: Fri Feb 1 11:15:05 2019 -0800 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly ## What changes were proposed in this pull request? Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report: ``` "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } ``` This issue was reported by liancheng . This PR fixes the above issue. ## How was this patch tested? The new unit tests. Closes #23718 from zsxwing/merge-zero. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../streaming/EventTimeWatermarkExec.scala | 17 +--- .../sql/streaming/EventTimeWatermarkSuite.scala| 32 -- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index b161651..6fa7ee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou } def merge(that: EventTimeStats): Unit = { -this.max = math.max(this.max, that.max) -this.min = math.min(this.min, that.min) -this.count += that.count -this.avg += (that.avg - this.avg) * that.count / this.count +if (that.count == 0) { + // no-op +} else if (this.count == 0) { + this.max = that.max + this.min = that.min + this.count = that.count + this.avg = that.avg +} else { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.count += that.count + this.avg += (that.avg - this.avg) * that.count / this.count +} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c696204..b79770a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche sqlContext.streams.active.foreach(_.stop()) } - test("EventTimeStats") { -val epsilon = 10E-6 + private val epsilon = 10E-6 + test("EventTimeStats") { val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5) stats.add(80L) stats.max should be (100) @@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("EventTimeStats: avg on large values") { -val epsilon = 10E-6 val largeValue = 100L // 10B // Make sure `largeValue` will cause overflow if we use a Long sum to calc avg. assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue)) @@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche stats.avg should be ((largeValue + 0.5) +- epsilon) } + test("EventTimeStats: zero merge zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats should be (EventTimeStats.zero) + } + + test("EventTimeStats: non-zero merge zero") { +val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +val stats2 = EventTimeStats.zero +stats.merge(stats2) +stats.max should be (10L) +stats.min should be (1L) +stats.avg should be (5.0 +- epsilon) +stats.count should be (3L) + } + + test("EventTimeStats: zero merge non-zero") { +val stats = EventTimeStats.zero +val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) +stats.merge(stats2) +stats.max should be (10L) +stats.min should be (1L) +stats.avg should be (5.0 +- epsilon) +stats.co