spark git commit: [SPARK-13692][CORE][SQL] Fix trivial Coverity/Checkstyle defects
Repository: spark Updated Branches: refs/heads/master 035d3acdf -> f3201aeeb [SPARK-13692][CORE][SQL] Fix trivial Coverity/Checkstyle defects ## What changes were proposed in this pull request? This issue fixes the following potential bugs and Java coding style detected by Coverity and Checkstyle. - Implement both null and type checking in equals functions. - Fix wrong type casting logic in SimpleJavaBean2.equals. - Add `implement Cloneable` to `UTF8String` and `SortedIterator`. - Remove dereferencing before null check in `AbstractBytesToBytesMapSuite`. - Fix coding style: Add '{}' to single `for` statement in mllib examples. - Remove unused imports in `ColumnarBatch` and `JavaKinesisStreamSuite`. - Remove unused fields in `ChunkFetchIntegrationSuite`. - Add `stop()` to prevent resource leak. Please note that the last two checkstyle errors exist on newly added commits after [SPARK-13583](https://issues.apache.org/jira/browse/SPARK-13583). ## How was this patch tested? manual via `./dev/lint-java` and Coverity site. Author: Dongjoon Hyun Closes #11530 from dongjoon-hyun/SPARK-13692. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3201aee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3201aee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3201aee Branch: refs/heads/master Commit: f3201aeeb06aae3b11e8cf6ee9693182dd896b32 Parents: 035d3ac Author: Dongjoon Hyun Authored: Wed Mar 9 10:12:23 2016 + Committer: Sean Owen Committed: Wed Mar 9 10:12:23 2016 + -- .../org/apache/spark/network/ChunkFetchIntegrationSuite.java | 2 -- .../apache/spark/network/RequestTimeoutIntegrationSuite.java | 2 +- .../apache/spark/network/TransportClientFactorySuite.java| 2 ++ .../main/java/org/apache/spark/unsafe/types/UTF8String.java | 3 ++- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java| 2 +- .../spark/unsafe/map/AbstractBytesToBytesMapSuite.java | 4 ++-- .../apache/spark/examples/ml/JavaCountVectorizerExample.java | 2 ++ .../examples/ml/JavaDecisionTreeClassificationExample.java | 2 ++ .../spark/examples/ml/JavaDecisionTreeRegressionExample.java | 2 ++ .../apache/spark/examples/ml/JavaSQLTransformerExample.java | 2 ++ .../org/apache/spark/examples/ml/JavaWord2VecExample.java| 2 ++ .../spark/examples/mllib/JavaAssociationRulesExample.java| 2 ++ .../spark/examples/mllib/JavaGaussianMixtureExample.java | 3 ++- .../mllib/JavaGradientBoostingClassificationExample.java | 2 ++ .../mllib/JavaGradientBoostingRegressionExample.java | 2 ++ .../spark/examples/mllib/JavaIsotonicRegressionExample.java | 2 ++ .../org/apache/spark/examples/mllib/JavaKMeansExample.java | 3 ++- .../examples/mllib/JavaLatentDirichletAllocationExample.java | 3 ++- .../mllib/JavaMultiLabelClassificationMetricsExample.java| 2 ++ .../apache/spark/examples/mllib/JavaNaiveBayesExample.java | 2 ++ .../apache/spark/examples/mllib/JavaPrefixSpanExample.java | 2 ++ .../mllib/JavaRandomForestClassificationExample.java | 2 ++ .../examples/mllib/JavaRandomForestRegressionExample.java| 2 ++ .../spark/examples/mllib/JavaRankingMetricsExample.java | 2 ++ .../spark/examples/mllib/JavaRecommendationExample.java | 2 ++ .../spark/examples/mllib/JavaRegressionMetricsExample.java | 2 ++ .../java/org/apache/spark/examples/mllib/JavaSVDExample.java | 5 - .../org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java | 2 ++ .../spark/streaming/kinesis/JavaKinesisStreamSuite.java | 2 -- .../apache/spark/sql/execution/vectorized/ColumnarBatch.java | 1 - .../java/test/org/apache/spark/sql/JavaDatasetSuite.java | 8 +++- 31 files changed, 61 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f3201aee/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java -- diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index d17e986..6d62eaf 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -64,8 +64,6 @@ public class ChunkFetchIntegrationSuite { static ManagedBuffer bufferChunk; static ManagedBuffer fileChunk; - private TransportConf transportConf; - @BeforeClass public static void setUp() throws Exception { int bufSize = 10; http://git-wip-us.apache.org/repos/asf/spark/blob/f3201aee/common/network-common/src/test/java/org/apache/spark/network/RequestTi
spark git commit: [SPARK-13640][SQL] Synchronize ScalaReflection.mirror method.
Repository: spark Updated Branches: refs/heads/master f3201aeeb -> 2c5af7d4d [SPARK-13640][SQL] Synchronize ScalaReflection.mirror method. ## What changes were proposed in this pull request? `ScalaReflection.mirror` method should be synchronized when scala version is `2.10` because `universe.runtimeMirror` is not thread safe. ## How was this patch tested? I added a test to check thread safety of `ScalaRefection.mirror` method in `ScalaReflectionSuite`, which will throw the following Exception in Scala `2.10` without this patch: ``` [info] - thread safety of mirror *** FAILED *** (49 milliseconds) [info] java.lang.UnsupportedOperationException: tail of empty list [info] at scala.collection.immutable.Nil$.tail(List.scala:339) [info] at scala.collection.immutable.Nil$.tail(List.scala:334) [info] at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) [info] at scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477) [info] at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777) [info] at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:235) [info] at scala.reflect.runtime.JavaMirrors$class.createMirror(JavaMirrors.scala:34) [info] at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:61) [info] at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12) [info] at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12) [info] at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:36) [info] at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:256) [info] at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:252) [info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [info] at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) [info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` Notice that the test will pass when Scala version is `2.11`. Author: Takuya UESHIN Closes #11487 from ueshin/issues/SPARK-13640. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5af7d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5af7d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5af7d4 Branch: refs/heads/master Commit: 2c5af7d4d939e18a749d33b5de2e5113aa3eff08 Parents: f3201ae Author: Takuya UESHIN Authored: Wed Mar 9 10:23:27 2016 + Committer: Sean Owen Committed: Wed Mar 9 10:23:27 2016 + -- .../spark/sql/catalyst/ScalaReflection.scala| 7 +++- .../sql/catalyst/ScalaReflectionSuite.scala | 41 2 files changed, 46 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c5af7d4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 02cb2d9..c12b5c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -32,8 +32,10 @@ object ScalaReflection extends ScalaReflection { // Since we are creating a runtime mirror usign the class loader of current thread, // we need to use def at here. So, every time we call mirror, it is using the // class loader of the current thread. - override def mirror: universe.Mirror = + // SPARK-13640: Synchronize this because universe.runtimeMirror is not thread-safe in Scala 2.10. + override def mirror: universe.Mirror = ScalaReflectionLock.synchronized { universe.runtimeMirror(Thread.currentThread().getContextClassLoader) + } import universe._ @@ -665,7 +667,8 @@ trait ScalaReflection { * * @see SPARK-5281 */ - def localTypeOf[T: TypeTag]: `Type` = { + // SPARK-13640: Synchronize this because TypeTag.tpe is not thread-safe in Scala 2.10. + def localTypeOf[T: TypeTag]: `Type` = ScalaReflectionLock.synchronized {
spark git commit: [SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs
Repository: spark Updated Branches: refs/heads/branch-1.6 8ec4f159a -> 95105b0e6 [SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs ## What changes were proposed in this pull request? If a job is being scheduled in one thread which has a dependency on an RDD currently executing a shuffle in another thread, Spark would throw a NullPointerException. This patch synchronizes access to `mapStatuses` and skips null status entries (which are in-progress shuffle tasks). ## How was this patch tested? Our client code unit test suite, which was reliably reproducing the race condition with 10 threads, shows that this fixes it. I have not found a minimal test case to add to Spark, but I will attempt to do so if desired. The same test case was tripping up on SPARK-4454, which was fixed by making other DAGScheduler code thread-safe. shivaram srowen Author: Andy Sloane Closes #11505 from a1k0n/SPARK-13631. (cherry picked from commit cbff2803ef117d7cffe6f05fc1bbd395a1e9c587) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95105b0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95105b0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95105b0e Branch: refs/heads/branch-1.6 Commit: 95105b0e6e38f5f13f41b06695c0b059ff911a44 Parents: 8ec4f15 Author: Andy Sloane Authored: Wed Mar 9 10:25:47 2016 + Committer: Sean Owen Committed: Wed Mar 9 10:26:00 2016 + -- .../org/apache/spark/MapOutputTracker.scala | 52 +++- 1 file changed, 29 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95105b0e/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 72355cd..998b4d5 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -384,8 +384,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * @param numReducers total number of reducers in the shuffle * @param fractionThreshold fraction of total map output size that a location must have * for it to be considered large. - * - * This method is not thread-safe. */ def getLocationsWithLargestOutputs( shuffleId: Int, @@ -394,28 +392,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) fractionThreshold: Double) : Option[Array[BlockManagerId]] = { -if (mapStatuses.contains(shuffleId)) { - val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty) { -// HashMap to add up sizes of all blocks at the same location -val locs = new HashMap[BlockManagerId, Long] -var totalOutputSize = 0L -var mapIdx = 0 -while (mapIdx < statuses.length) { - val status = statuses(mapIdx) - val blockSize = status.getSizeForBlock(reducerId) - if (blockSize > 0) { -locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize -totalOutputSize += blockSize +val statuses = mapStatuses.get(shuffleId).orNull +if (statuses != null) { + statuses.synchronized { +if (statuses.nonEmpty) { + // HashMap to add up sizes of all blocks at the same location + val locs = new HashMap[BlockManagerId, Long] + var totalOutputSize = 0L + var mapIdx = 0 + while (mapIdx < statuses.length) { +val status = statuses(mapIdx) +// status may be null here if we are called between registerShuffle, which creates an +// array with null entries for each output, and registerMapOutputs, which populates it +// with valid status entries. This is possible if one thread schedules a job which +// depends on an RDD which is currently being computed by another thread. +if (status != null) { + val blockSize = status.getSizeForBlock(reducerId) + if (blockSize > 0) { +locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize +totalOutputSize += blockSize + } +} +mapIdx = mapIdx + 1 + } + val topLocs = locs.filter { case (loc, size) => +size.toDouble / totalOutputSize >= fractionThreshold + } + // Return if we have any locations which satisfy the required threshold + if (topLocs.nonEmpty) { +return Some(topLocs.keys.toArray) } - mapIdx = mapIdx + 1 -} -val topLocs = locs
spark git commit: [SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs
Repository: spark Updated Branches: refs/heads/master 2c5af7d4d -> cbff2803e [SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs ## What changes were proposed in this pull request? If a job is being scheduled in one thread which has a dependency on an RDD currently executing a shuffle in another thread, Spark would throw a NullPointerException. This patch synchronizes access to `mapStatuses` and skips null status entries (which are in-progress shuffle tasks). ## How was this patch tested? Our client code unit test suite, which was reliably reproducing the race condition with 10 threads, shows that this fixes it. I have not found a minimal test case to add to Spark, but I will attempt to do so if desired. The same test case was tripping up on SPARK-4454, which was fixed by making other DAGScheduler code thread-safe. shivaram srowen Author: Andy Sloane Closes #11505 from a1k0n/SPARK-13631. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbff2803 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbff2803 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbff2803 Branch: refs/heads/master Commit: cbff2803ef117d7cffe6f05fc1bbd395a1e9c587 Parents: 2c5af7d Author: Andy Sloane Authored: Wed Mar 9 10:25:47 2016 + Committer: Sean Owen Committed: Wed Mar 9 10:25:47 2016 + -- .../org/apache/spark/MapOutputTracker.scala | 52 +++- 1 file changed, 29 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbff2803/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index eb2fdec..9cb6159 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -376,8 +376,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * @param numReducers total number of reducers in the shuffle * @param fractionThreshold fraction of total map output size that a location must have * for it to be considered large. - * - * This method is not thread-safe. */ def getLocationsWithLargestOutputs( shuffleId: Int, @@ -386,28 +384,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) fractionThreshold: Double) : Option[Array[BlockManagerId]] = { -if (mapStatuses.contains(shuffleId)) { - val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty) { -// HashMap to add up sizes of all blocks at the same location -val locs = new HashMap[BlockManagerId, Long] -var totalOutputSize = 0L -var mapIdx = 0 -while (mapIdx < statuses.length) { - val status = statuses(mapIdx) - val blockSize = status.getSizeForBlock(reducerId) - if (blockSize > 0) { -locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize -totalOutputSize += blockSize +val statuses = mapStatuses.get(shuffleId).orNull +if (statuses != null) { + statuses.synchronized { +if (statuses.nonEmpty) { + // HashMap to add up sizes of all blocks at the same location + val locs = new HashMap[BlockManagerId, Long] + var totalOutputSize = 0L + var mapIdx = 0 + while (mapIdx < statuses.length) { +val status = statuses(mapIdx) +// status may be null here if we are called between registerShuffle, which creates an +// array with null entries for each output, and registerMapOutputs, which populates it +// with valid status entries. This is possible if one thread schedules a job which +// depends on an RDD which is currently being computed by another thread. +if (status != null) { + val blockSize = status.getSizeForBlock(reducerId) + if (blockSize > 0) { +locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize +totalOutputSize += blockSize + } +} +mapIdx = mapIdx + 1 + } + val topLocs = locs.filter { case (loc, size) => +size.toDouble / totalOutputSize >= fractionThreshold + } + // Return if we have any locations which satisfy the required threshold + if (topLocs.nonEmpty) { +return Some(topLocs.keys.toArray) } - mapIdx = mapIdx + 1 -} -val topLocs = locs.filter { case (loc, size) => - size.toDouble / totalOutputSize >= fractionThreshold -}
[1/2] spark git commit: [SPARK-13702][CORE][SQL][MLLIB] Use diamond operator for generic instance creation in Java code.
Repository: spark Updated Branches: refs/heads/master cbff2803e -> c3689bc24 http://git-wip-us.apache.org/repos/asf/spark/blob/c3689bc2/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java -- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java index 4ef1f27..fc24600 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java @@ -50,7 +50,7 @@ public class Complex implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + private static final Map, SchemeFactory> schemes = new HashMap<>(); static { schemes.put(StandardScheme.class, new ComplexStandardSchemeFactory()); schemes.put(TupleScheme.class, new ComplexTupleSchemeFactory()); @@ -72,7 +72,7 @@ public class Complex implements org.apache.thrift.TBase byName = new HashMap(); +private static final Map byName = new HashMap<>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { @@ -141,7 +141,7 @@ public class Complex implements org.apache.thrift.TBase metaDataMap; static { -Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); +Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<>(_Fields.class); tmpMap.put(_Fields.AINT, new org.apache.thrift.meta_data.FieldMetaData("aint", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.A_STRING, new org.apache.thrift.meta_data.FieldMetaData("aString", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -194,28 +194,28 @@ public class Complex implements org.apache.thrift.TBase __this__lint = new ArrayList(); + List __this__lint = new ArrayList<>(); for (Integer other_element : other.lint) { __this__lint.add(other_element); } this.lint = __this__lint; } if (other.isSetLString()) { - List __this__lString = new ArrayList(); + List __this__lString = new ArrayList<>(); for (String other_element : other.lString) { __this__lString.add(other_element); } this.lString = __this__lString; } if (other.isSetLintString()) { - List __this__lintString = new ArrayList(); + List __this__lintString = new ArrayList<>(); for (IntString other_element : other.lintString) { __this__lintString.add(new IntString(other_element)); } this.lintString = __this__lintString; } if (other.isSetMStringString()) { - Map __this__mStringString = new HashMap(); + Map __this__mStringString = new HashMap<>(); for (Map.Entry other_element : other.mStringString.entrySet()) { String other_element_key = other_element.getKey(); @@ -339,7 +339,7 @@ public class Complex implements org.apache.thrift.TBase(); + this.lString = new ArrayList<>(); } this.lString.add(elem); } @@ -411,7 +411,7 @@ public class Complex implements org.apache.thrift.TBase(); + this.mStringString = new HashMap<>(); } this.mStringString.put(key, val); } @@ -876,7 +876,7 @@ public class Complex implements org.apache.thrift.TBase(_list0.size); +struct.lint = new ArrayList<>(_list0.size); for (int _i1 = 0; _i1 < _list0.size; ++_i1) { int _elem2; // required @@ -894,7 +894,7 @@ public class Complex implements org.apache.thrift.TBase(_list3.size); +struct.lString = new ArrayList<>(_list3.size); for (int _i4 = 0; _i4 < _list3.size; ++_i4) { String _elem5; // required @@ -912,7 +912,7 @@ public class Complex implements org.apache.thrift.TBase(_list6.size); +struct.lintString = new ArrayList<>(_list6.size); for (int _i7 = 0; _i7 < _list6.size; ++_i7) { IntString _elem8; // required @@ -1114,7 +1114,7 @@ public class Complex implements org.apache.thrift.TBase(_list21.size); + struct.lint = new ArrayList<>(_list21.size); for (int _i22 = 0; _i22 < _list21.size; ++_i22) { int _elem23; // required @@ -1127,7 +1127,7 @@ public class Complex implements org.apache.thrift.TBase(_list24.size); + struct.lString = new ArrayList<>(_list24.size); for (int _i25 = 0; _i25 < _list24.size; ++_i25) { String _elem26; // required @@ -1140,7 +1140,7 @@ public class Complex implements org.apache.thrift.TBase(_list27.size); + struct.lintString = new ArrayList<>(_list27.size); for (int _i28 = 0;
[2/2] spark git commit: [SPARK-13702][CORE][SQL][MLLIB] Use diamond operator for generic instance creation in Java code.
[SPARK-13702][CORE][SQL][MLLIB] Use diamond operator for generic instance creation in Java code. ## What changes were proposed in this pull request? In order to make `docs/examples` (and other related code) more simple/readable/user-friendly, this PR replaces existing codes like the followings by using `diamond` operator. ``` -final ArrayList> dataToWrite = - new ArrayList>(); +final ArrayList> dataToWrite = new ArrayList<>(); ``` Java 7 or higher supports **diamond** operator which replaces the type arguments required to invoke the constructor of a generic class with an empty set of type parameters (<>). Currently, Spark Java code use mixed usage of this. ## How was this patch tested? Manual. Pass the existing tests. Author: Dongjoon Hyun Closes #11541 from dongjoon-hyun/SPARK-13702. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3689bc2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3689bc2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3689bc2 Branch: refs/heads/master Commit: c3689bc24e03a9471cd6e8169da61963c4528252 Parents: cbff280 Author: Dongjoon Hyun Authored: Wed Mar 9 10:31:26 2016 + Committer: Sean Owen Committed: Wed Mar 9 10:31:26 2016 + -- .../network/client/TransportClientFactory.java | 4 +-- .../shuffle/sort/ShuffleExternalSorter.java | 4 +-- .../apache/spark/status/api/v1/TaskSorting.java | 2 +- .../spark/launcher/SparkLauncherSuite.java | 2 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 19 +--- .../map/AbstractBytesToBytesMapSuite.java | 6 ++-- .../spark/util/collection/TestTimSort.java | 2 +- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- docs/sql-programming-guide.md | 4 +-- docs/streaming-programming-guide.md | 4 +-- .../org/apache/spark/examples/JavaLogQuery.java | 2 +- .../org/apache/spark/examples/JavaPageRank.java | 2 +- .../org/apache/spark/examples/JavaSparkPi.java | 2 +- .../java/org/apache/spark/examples/JavaTC.java | 10 +++--- .../apache/spark/examples/JavaWordCount.java| 2 +- .../ml/JavaElementwiseProductExample.java | 2 +- .../JavaDecisionTreeClassificationExample.java | 4 +-- .../JavaDecisionTreeRegressionExample.java | 4 +-- ...vaGradientBoostingClassificationExample.java | 4 +-- .../JavaGradientBoostingRegressionExample.java | 4 +-- .../mllib/JavaIsotonicRegressionExample.java| 2 +- .../JavaLinearRegressionWithSGDExample.java | 2 +- .../examples/mllib/JavaNaiveBayesExample.java | 2 +- .../spark/examples/mllib/JavaPCAExample.java| 2 +- .../JavaRandomForestClassificationExample.java | 4 +-- .../JavaRandomForestRegressionExample.java | 4 +-- .../mllib/JavaRecommendationExample.java| 6 ++-- .../spark/examples/mllib/JavaSVDExample.java| 2 +- .../examples/streaming/JavaActorWordCount.java | 2 +- .../examples/streaming/JavaCustomReceiver.java | 2 +- .../streaming/JavaDirectKafkaWordCount.java | 6 ++-- .../examples/streaming/JavaKafkaWordCount.java | 4 +-- .../streaming/JavaNetworkWordCount.java | 2 +- .../examples/streaming/JavaQueueStream.java | 4 +-- .../JavaRecoverableNetworkWordCount.java| 2 +- .../streaming/JavaKinesisWordCountASL.java | 2 +- .../spark/launcher/AbstractCommandBuilder.java | 16 +- .../spark/launcher/CommandBuilderUtils.java | 2 +- .../apache/spark/launcher/LauncherServer.java | 2 +- .../java/org/apache/spark/launcher/Main.java| 8 ++--- .../launcher/SparkClassCommandBuilder.java | 2 +- .../apache/spark/launcher/SparkLauncher.java| 6 ++-- .../launcher/SparkSubmitCommandBuilder.java | 12 .../SparkSubmitCommandBuilderSuite.java | 10 +++--- .../JavaDecisionTreeClassifierSuite.java| 2 +- .../classification/JavaGBTClassifierSuite.java | 2 +- .../JavaRandomForestClassifierSuite.java| 2 +- .../JavaDecisionTreeRegressorSuite.java | 2 +- .../ml/regression/JavaGBTRegressorSuite.java| 2 +- .../JavaRandomForestRegressorSuite.java | 2 +- .../spark/mllib/clustering/JavaLDASuite.java| 8 ++--- .../spark/mllib/tree/JavaDecisionTreeSuite.java | 4 +-- .../org/apache/spark/sql/types/DataTypes.java | 2 +- .../SpecificParquetRecordReaderBase.java| 2 +- .../apache/spark/sql/JavaApplySchemaSuite.java | 2 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 2 +- .../spark/sql/hive/aggregate/MyDoubleAvg.java | 4 +-- .../spark/sql/hive/aggregate/MyDoubleSum.java | 4 +-- .../org/apache/spark/sql/hive/test/Complex.java | 32 ++-- 59 files changed, 129 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3689
spark git commit: [SPARK-13769][CORE] Update Java Doc in Spark Submit
Repository: spark Updated Branches: refs/heads/master c3689bc24 -> 8e8633e0b [SPARK-13769][CORE] Update Java Doc in Spark Submit JIRA : https://issues.apache.org/jira/browse/SPARK-13769 The java doc here (https://github.com/apache/spark/blob/e97fc7f176f8bf501c9b3afd8410014e3b0e1602/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L51) needs to be updated from "The latter two operations are currently supported only for standalone cluster mode." to "The latter two operations are currently supported only for standalone and mesos cluster modes." Author: Ahmed Kamal Closes #11600 from AhmedKamal/SPARK-13769. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e8633e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e8633e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e8633e0 Branch: refs/heads/master Commit: 8e8633e0b23a08cdcddcf3c5e8fd0ba3b337e389 Parents: c3689bc Author: Ahmed Kamal Authored: Wed Mar 9 12:28:58 2016 + Committer: Sean Owen Committed: Wed Mar 9 12:28:58 2016 + -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8e8633e0/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7d7ddcc..e8d0c3f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -48,7 +48,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, U /** * Whether to submit, kill, or request the status of an application. - * The latter two operations are currently supported only for standalone cluster mode. + * The latter two operations are currently supported only for standalone and Mesos cluster modes. */ private[deploy] object SparkSubmitAction extends Enumeration { type SparkSubmitAction = Value - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13698][SQL] Fix Analysis Exceptions when Using Backticks in Generate
Repository: spark Updated Branches: refs/heads/master 8e8633e0b -> 53ba6d6e5 [SPARK-13698][SQL] Fix Analysis Exceptions when Using Backticks in Generate ## What changes were proposed in this pull request? Analysis exception occurs while running the following query. ``` SELECT ints FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints` ``` ``` Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`ints`' given input columns: [a, `ints`]; line 1 pos 7 'Project ['ints] +- Generate explode(a#0.b), true, false, Some(a), [`ints`#8] +- SubqueryAlias nestedarray +- LocalRelation [a#0], 1,2,3 ``` ## How was this patch tested? Added new unit tests in SQLQuerySuite and HiveQlSuite Author: Dilip Biswal Closes #11538 from dilipbiswal/SPARK-13698. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53ba6d6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53ba6d6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53ba6d6e Branch: refs/heads/master Commit: 53ba6d6e59d1172035c2d5e2906bb03fd1998e14 Parents: 8e8633e Author: Dilip Biswal Authored: Wed Mar 9 21:49:37 2016 +0800 Committer: Wenchen Fan Committed: Wed Mar 9 21:49:37 2016 +0800 -- .../spark/sql/catalyst/parser/CatalystQl.scala | 10 -- .../org/apache/spark/sql/hive/HiveQlSuite.scala | 20 .../sql/hive/execution/SQLQuerySuite.scala | 17 + 3 files changed, 45 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53ba6d6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala index 5d96d8e..b1b449a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala @@ -899,10 +899,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val attributes = clauses.collect { - case Token(a, Nil) => UnresolvedAttribute(a.toLowerCase) + case Token(a, Nil) => UnresolvedAttribute(cleanIdentifier(a.toLowerCase)) } -Generate(generator, join = true, outer = outer, Some(alias.toLowerCase), attributes, child) +Generate( + generator, + join = true, + outer = outer, + Some(cleanIdentifier(alias.toLowerCase)), + attributes, + child) } protected def nodeToGenerator(node: ASTNode): Generator = noParseRule("Generator", node) http://git-wip-us.apache.org/repos/asf/spark/blob/53ba6d6e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 8e8d3f3..626550f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -208,4 +208,24 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t """.stripMargin) } + + test("use backticks in output of Generator") { +val plan = parser.parsePlan( + """ +|SELECT `gentab2`.`gencol2` +|FROM `default`.`src` +|LATERAL VIEW explode(array(array(1, 2, 3))) `gentab1` AS `gencol1` +|LATERAL VIEW explode(`gentab1`.`gencol1`) `gentab2` AS `gencol2` + """.stripMargin) + } + + test("use escaped backticks in output of Generator") { +val plan = parser.parsePlan( + """ +|SELECT `gen``tab2`.`gen``col2` +|FROM `default`.`src` +|LATERAL VIEW explode(array(array(1, 2, 3))) `gen``tab1` AS `gen``col1` +|LATERAL VIEW explode(`gen``tab1`.`gen``col1`) `gen``tab2` AS `gen``col2` + """.stripMargin) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/53ba6d6e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0c9bac1..b42f00e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite
svn commit: r1734288 - in /spark: documentation.md site/documentation.html
Author: srowen Date: Wed Mar 9 17:06:53 2016 New Revision: 1734288 URL: http://svn.apache.org/viewvc?rev=1734288&view=rev Log: Add Big Data Analytics book to list of Spark resources Modified: spark/documentation.md spark/site/documentation.html Modified: spark/documentation.md URL: http://svn.apache.org/viewvc/spark/documentation.md?rev=1734288&r1=1734287&r2=1734288&view=diff == --- spark/documentation.md (original) +++ spark/documentation.md Wed Mar 9 17:06:53 2016 @@ -153,6 +153,7 @@ Slides, videos and EC2-based exercises f https://www.packtpub.com/big-data-and-business-intelligence/spark-cookbook";>Spark Cookbook, by Rishi Yadav (Packt Publishing) https://www.packtpub.com/big-data-and-business-intelligence/apache-spark-graph-processing";>Apache Spark Graph Processing, by Rindra Ramamonjison (Packt Publishing) https://www.packtpub.com/big-data-and-business-intelligence/mastering-apache-spark";>Mastering Apache Spark, by Mike Frampton (Packt Publishing) + http://www.apress.com/9781484209653";>Big Data Analytics with Spark: A Practitioner's Guide to Using Spark for Large Scale Data Analysis, by Mohammed Guller (Apress) Examples Modified: spark/site/documentation.html URL: http://svn.apache.org/viewvc/spark/site/documentation.html?rev=1734288&r1=1734287&r2=1734288&view=diff == --- spark/site/documentation.html (original) +++ spark/site/documentation.html Wed Mar 9 17:06:53 2016 @@ -327,6 +327,7 @@ Slides, videos and EC2-based exercises f https://www.packtpub.com/big-data-and-business-intelligence/spark-cookbook";>Spark Cookbook, by Rishi Yadav (Packt Publishing) https://www.packtpub.com/big-data-and-business-intelligence/apache-spark-graph-processing";>Apache Spark Graph Processing, by Rindra Ramamonjison (Packt Publishing) https://www.packtpub.com/big-data-and-business-intelligence/mastering-apache-spark";>Mastering Apache Spark, by Mike Frampton (Packt Publishing) + http://www.apress.com/9781484209653";>Big Data Analytics with Spark: A Practitioner's Guide to Using Spark for Large Scale Data Analysis, by Mohammed Guller (Apress) Examples - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13242] [SQL] codegen fallback in case-when if there many branches
Repository: spark Updated Branches: refs/heads/master 53ba6d6e5 -> 9634e17d0 [SPARK-13242] [SQL] codegen fallback in case-when if there many branches ## What changes were proposed in this pull request? If there are many branches in a CaseWhen expression, the generated code could go above the 64K limit for single java method, will fail to compile. This PR change it to fallback to interpret mode if there are more than 20 branches. This PR is based on #11243 and #11221, thanks to joehalliwell Closes #11243 Closes #11221 ## How was this patch tested? Add a test with 50 branches. Author: Davies Liu Closes #11592 from davies/fix_when. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9634e17d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9634e17d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9634e17d Branch: refs/heads/master Commit: 9634e17d0183d43606a96fbba630e4c6ad720f7c Parents: 53ba6d6 Author: Davies Liu Authored: Wed Mar 9 09:27:28 2016 -0800 Committer: Davies Liu Committed: Wed Mar 9 09:27:28 2016 -0800 -- .../expressions/conditionalExpressions.scala| 14 - .../sql/catalyst/expressions/literals.scala | 6 +++--- .../expressions/CodeGenerationSuite.scala | 21 .../spark/sql/execution/WholeStageCodegen.scala | 1 + 4 files changed, 38 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9634e17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index c3e9fa3..5ceb365 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -86,7 +86,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi * @param elseValue optional value for the else branch */ case class CaseWhen(branches: Seq[(Expression, Expression)], elseValue: Option[Expression] = None) - extends Expression { + extends Expression with CodegenFallback { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -136,7 +136,16 @@ case class CaseWhen(branches: Seq[(Expression, Expression)], elseValue: Option[E } } + def shouldCodegen: Boolean = { +branches.length < CaseWhen.MAX_NUM_CASES_FOR_CODEGEN + } + override def genCode(ctx: CodegenContext, ev: ExprCode): String = { +if (!shouldCodegen) { + // Fallback to interpreted mode if there are too many branches, as it may reach the + // 64K limit (limit on bytecode size for a single function). + return super[CodegenFallback].genCode(ctx, ev) +} // Generate code that looks like: // // condA = ... @@ -205,6 +214,9 @@ case class CaseWhen(branches: Seq[(Expression, Expression)], elseValue: Option[E /** Factory methods for CaseWhen. */ object CaseWhen { + // The maxium number of switches supported with codegen. + val MAX_NUM_CASES_FOR_CODEGEN = 20 + def apply(branches: Seq[(Expression, Expression)], elseValue: Expression): CaseWhen = { CaseWhen(branches, Option(elseValue)) } http://git-wip-us.apache.org/repos/asf/spark/blob/9634e17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 37bfe98..a76517a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -203,7 +203,7 @@ case class Literal protected (value: Any, dataType: DataType) case FloatType => val v = value.asInstanceOf[Float] if (v.isNaN || v.isInfinite) { -super.genCode(ctx, ev) +super[CodegenFallback].genCode(ctx, ev) } else { ev.isNull = "false" ev.value = s"${value}f" @@ -212,7 +212,7 @@ case class Literal protected (value: Any, dataType: DataType) case DoubleType => val v = value.asInstanceOf[Double] if (v.isNaN || v.isInfinite) { -super.genCode(ctx, ev) +super[CodegenFallback].genCode(ctx, ev)
spark git commit: Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks"
Repository: spark Updated Branches: refs/heads/master 9634e17d0 -> 7791d0c3a Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks" This reverts commit e430614eae53c8864b31a1dc64db83e27100d1d9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7791d0c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7791d0c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7791d0c3 Branch: refs/heads/master Commit: 7791d0c3a9bdfe73e071266846f9ab1491fce50c Parents: 9634e17 Author: Davies Liu Authored: Wed Mar 9 10:05:57 2016 -0800 Committer: Davies Liu Committed: Wed Mar 9 10:05:57 2016 -0800 -- .../sql/catalyst/planning/QueryPlanner.scala| 24 + .../spark/sql/execution/SparkStrategies.scala | 37 +++ .../sql/execution/ReorderedPredicateSuite.scala | 103 --- 3 files changed, 14 insertions(+), 150 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7791d0c3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 1e4523e..56a3dd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.catalyst.planning import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper} -import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode @@ -28,28 +26,8 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * be used for execution. If this strategy does not apply to the give logical operation then an * empty list should be returned. */ -abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] - extends PredicateHelper with Logging { - +abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { def apply(plan: LogicalPlan): Seq[PhysicalPlan] - - // Attempts to re-order the individual conjunctive predicates in an expression to short circuit - // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others. - protected def reorderPredicates(expr: Expression): Expression = { -splitConjunctivePredicates(expr) - .sortWith((x, _) => x.isInstanceOf[IsNotNull]) - .reduce(And) - } - - // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins - protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = { -exprOpt match { - case Some(expr) => -Option(reorderPredicates(expr)) - case None => -exprOpt -} - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7791d0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 36fea4d..debd04a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,13 +66,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys( LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => joins.BroadcastLeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), - reorderPredicates(condition)) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => joins.LeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), - reorderPredicates(condition)) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil case _ => Nil } } @@ -113,39 +111,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => S
[8/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
[SPARK-13595][BUILD] Move docker, extras modules into external ## What changes were proposed in this pull request? Move `docker` dirs out of top level into `external/`; move `extras/*` into `external/` ## How was this patch tested? This is tested with Jenkins tests. Author: Sean Owen Closes #11523 from srowen/SPARK-13595. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/256704c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/256704c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/256704c7 Branch: refs/heads/master Commit: 256704c771d301700af9ebf0d180c1ba7c4116c0 Parents: 7791d0c Author: Sean Owen Authored: Wed Mar 9 18:27:44 2016 + Committer: Sean Owen Committed: Wed Mar 9 18:27:44 2016 + -- dev/sparktestsupport/modules.py | 4 +- docker-integration-tests/pom.xml| 184 .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 160 .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 153 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 78 -- .../sql/jdbc/PostgresIntegrationSuite.scala | 105 --- .../org/apache/spark/util/DockerUtils.scala | 68 -- docker/README.md| 7 - docker/build| 22 - docker/spark-mesos/Dockerfile | 30 - docker/spark-test/README.md | 11 - docker/spark-test/base/Dockerfile | 37 - docker/spark-test/build | 22 - docker/spark-test/master/Dockerfile | 21 - docker/spark-test/master/default_cmd| 28 - docker/spark-test/worker/Dockerfile | 22 - docker/spark-test/worker/default_cmd| 28 - docs/streaming-kinesis-integration.md | 10 +- external/docker-integration-tests/pom.xml | 184 .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 160 .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 153 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 78 ++ .../sql/jdbc/PostgresIntegrationSuite.scala | 105 +++ .../org/apache/spark/util/DockerUtils.scala | 68 ++ external/docker/README.md | 7 + external/docker/build | 22 + external/docker/spark-mesos/Dockerfile | 30 + external/docker/spark-test/README.md| 11 + external/docker/spark-test/base/Dockerfile | 37 + external/docker/spark-test/build| 22 + external/docker/spark-test/master/Dockerfile| 21 + external/docker/spark-test/master/default_cmd | 28 + external/docker/spark-test/worker/Dockerfile| 22 + external/docker/spark-test/worker/default_cmd | 28 + external/java8-tests/README.md | 24 + external/java8-tests/pom.xml| 161 .../java/org/apache/spark/Java8APISuite.java| 393 .../apache/spark/streaming/Java8APISuite.java | 905 +++ .../src/test/resources/log4j.properties | 28 + .../scala/org/apache/spark/JDK8ScalaSuite.scala | 27 + external/kinesis-asl-assembly/pom.xml | 181 external/kinesis-asl/pom.xml| 87 ++ .../streaming/JavaKinesisWordCountASL.java | 189 .../examples/streaming/kinesis_wordcount_asl.py | 83 ++ .../src/main/resources/log4j.properties | 37 + .../streaming/KinesisWordCountASL.scala | 276 ++ .../kinesis/KinesisBackedBlockRDD.scala | 288 ++ .../streaming/kinesis/KinesisCheckpointer.scala | 133 +++ .../streaming/kinesis/KinesisInputDStream.scala | 76 ++ .../streaming/kinesis/KinesisReceiver.scala | 361 .../kinesis/KinesisRecordProcessor.scala| 177 .../streaming/kinesis/KinesisTestUtils.scala| 260 ++ .../spark/streaming/kinesis/KinesisUtils.scala | 560 .../kinesis/JavaKinesisStreamSuite.java | 62 ++ .../src/test/resources/log4j.properties | 27 + .../kinesis/KPLBasedKinesisTestUtils.scala | 72 ++ .../kinesis/KinesisBackedBlockRDDSuite.scala| 259 ++ .../kinesis/KinesisCheckpointerSuite.scala | 152 .../streaming/kinesis/KinesisFunSuite.scala | 46 + .../kinesis/KinesisReceiverSuite.scala | 210 + .../streaming/kinesis/KinesisStreamSuite.scala | 297 ++ external/spark-ganglia-lgpl/pom.xml | 49 + .../apache/spark/metrics/sink/GangliaSink.scala | 90 ++ extras/README.md| 1 - extras/java8-tests/README.md| 24 - extras/java8-tests/pom.xml | 161 .../java/org/apache/spark/Java8APISuite.java| 393 .../apache/spark/streaming/Java8APISuite.java | 905 --- .../src/test/resources/log4j.properties | 28 - .../scala/org/apache/spark/JDK8Scal
[2/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala deleted file mode 100644 index 15ac588..000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ /dev/null @@ -1,560 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import scala.reflect.ClassTag - -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, StreamingContext} -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -object KinesisUtils { - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - *(KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - *See the Kinesis Spark Streaming documentation for more - *details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - */ - def createStream[T: ClassTag]( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T): ReceiverInputDStream[T] = { -val cleanedHandler = ssc.sc.clean(messageHandler) -// Setting scope to override receiver stream's scope of "receiver stream" -ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), -initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, -cleanedHandler, None) -} - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. -
[4/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/README.md -- diff --git a/extras/README.md b/extras/README.md deleted file mode 100644 index 1b4174b..000 --- a/extras/README.md +++ /dev/null @@ -1 +0,0 @@ -This directory contains build components not included by default in Spark's build. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/README.md -- diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md deleted file mode 100644 index dc9e87f..000 --- a/extras/java8-tests/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# Java 8 Test Suites - -These tests require having Java 8 installed and are isolated from the main Spark build. -If Java 8 is not your system's default Java version, you will need to point Spark's build -to your Java location. The set-up depends a bit on the build system: - -* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass - `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically - include the Java 8 test project. - - `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"` - -* For Maven users, - - Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not - automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests` - must be used. - - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite` - - Note that the above command can only be run from project root directory since this module - depends on core and the test-jars of core and streaming. This means an install step is - required to make the test dependencies visible to the Java 8 sub-project. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/pom.xml -- diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml deleted file mode 100644 index 0ad9c53..000 --- a/extras/java8-tests/pom.xml +++ /dev/null @@ -1,161 +0,0 @@ - - -http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> - 4.0.0 - -org.apache.spark -spark-parent_2.11 -2.0.0-SNAPSHOT -../../pom.xml - - - org.apache.spark - java8-tests_2.11 - pom - Spark Project Java8 Tests POM - - -java8-tests - - - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-test-tags_${scala.binary.version} - - - - - - java8-tests - - - - - -org.apache.maven.plugins -maven-deploy-plugin - - true - - - -org.apache.maven.plugins -maven-install-plugin - - true - - - -org.apache.maven.plugins -maven-surefire-plugin - - -test - - test - - - - - - - - file:src/test/resources/log4j.properties - - - false - -**/Suite*.java -**/*Suite.java - - - - -org.apache.maven.plugins -maven-compiler-plugin - - -test-compile-first -process-test-resources - - testCompile - - - - - true - true - true - 1.8 - 1.8 - 1.8 - UTF-8 - 1024m - - - - -net.alchim31.maven -scala-maven-plugin - - -none - - -scala-compile-first -none - - -scala-test-compile-first -none - - -attach-scaladocs -none - - - - - - http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java ---
[7/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala -- diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala new file mode 100644 index 000..fda377e --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.{Inet4Address, InetAddress, NetworkInterface} + +import scala.collection.JavaConverters._ +import scala.sys.process._ +import scala.util.Try + +private[spark] object DockerUtils { + + def getDockerIp(): String = { +/** If docker-machine is setup on this box, attempts to find the ip from it. */ +def findFromDockerMachine(): Option[String] = { + sys.env.get("DOCKER_MACHINE_NAME").flatMap { name => +Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption + } +} +sys.env.get("DOCKER_IP") + .orElse(findFromDockerMachine()) + .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) + .getOrElse { +// This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist +// certain interfaces. +val address = InetAddress.getLocalHost +// Address resolves to something like 127.0.1.1, which happens on Debian; try to find +// a better address using the local network interfaces +// getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order +// on unix-like system. On windows, it returns in index order. +// It's more proper to pick ip address following system output order. +val blackListedIFs = Seq( + "vboxnet0", // Mac + "docker0"// Linux +) +val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i => + !blackListedIFs.contains(i.getName) +} +val reOrderedNetworkIFs = activeNetworkIFs.reverse +for (ni <- reOrderedNetworkIFs) { + val addresses = ni.getInetAddresses.asScala +.filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq + if (addresses.nonEmpty) { +val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) +// because of Inet6Address.toHostName may add interface at the end if it knows about it +val strippedAddress = InetAddress.getByAddress(addr.getAddress) +return strippedAddress.getHostAddress + } +} +address.getHostAddress + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/README.md -- diff --git a/external/docker/README.md b/external/docker/README.md new file mode 100644 index 000..40ba9c3 --- /dev/null +++ b/external/docker/README.md @@ -0,0 +1,7 @@ +Spark docker files +=== + +Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles), +as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker). + +Tested with Docker version 0.8.1. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/build -- diff --git a/external/docker/build b/external/docker/build new file mode 100755 index 000..253a2fc --- /dev/null +++ b/external/docker/build @@ -0,0 +1,22 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may o
[6/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java -- diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java new file mode 100644 index 000..5dc825d --- /dev/null +++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +import com.amazonaws.regions.RegionUtils; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + +/** + * Consumes messages from a Amazon Kinesis streams and does wordcount. + * + * This example spins up 1 Kinesis Receiver per shard for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given stream. + * + * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] + * [app-name] is the name of the consumer app, used to track the read data in DynamoDB + * [stream-name] name of the Kinesis stream (ie. mySparkStream) + * [endpoint-url] endpoint of the Kinesis service + * (e.g. https://kinesis.us-east-1.amazonaws.com) + * + * + * Example: + * # export AWS keys if necessary + * $ export AWS_ACCESS_KEY_ID=[your-access-key] + * $ export AWS_SECRET_KEY= + * + * # run the example + * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class called KinesisWordProducerASL which puts dummy data + * onto the Kinesis stream. + * + * This code uses the DefaultAWSCredentialsProviderChain to find credentials + * in the following order: + *Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + *Java System Properties - aws.accessKeyId and aws.secretKey + *Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + *Instance profile credentials - delivered through the Amazon EC2 metadata service + * For more information, see + * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html + * + * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on + * the Kinesis Spark Streaming integration. + */ +public final class JavaKinesisWordCountASL { // needs to be public for access from run-example + private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); + private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); + + public static void main(String[] args) { +// Check that all required args were passed in. +if (args.length != 3) { + System.err.println( + "Usage: JavaKinesisWordCountASL \n\n" + + " is the name of the app, used to track the read data in DynamoDB\n" + + " is the name of the Kinesis stream\n" + + " is the endpoint of the Kinesis service\n" + + "
[1/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
Repository: spark Updated Branches: refs/heads/master 7791d0c3a -> 256704c77 http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/pom.xml -- diff --git a/pom.xml b/pom.xml index 90f4672..a3af20c 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ sql/catalyst sql/core sql/hive -docker-integration-tests +external/docker-integration-tests assembly external/twitter external/flume @@ -2354,7 +2354,7 @@ spark-ganglia-lgpl -extras/spark-ganglia-lgpl +external/spark-ganglia-lgpl @@ -2362,8 +2362,8 @@ kinesis-asl -extras/kinesis-asl -extras/kinesis-asl-assembly +external/kinesis-asl +external/kinesis-asl-assembly @@ -2387,7 +2387,7 @@ -extras/java8-tests +external/java8-tests http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index b33e825..469c068 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1619,7 +1619,7 @@ def search_mqtt_test_jar(): def search_kinesis_asl_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] -kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly") +kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "external/kinesis-asl-assembly") jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly") if not jars: return None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[3/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala deleted file mode 100644 index 6a73bc0..000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import java.nio.ByteBuffer - -import scala.util.Random - -import com.amazonaws.auth.{BasicAWSCredentials, DefaultAWSCredentialsProviderChain} -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.PutRecordRequest -import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions -import org.apache.spark.streaming.kinesis.KinesisUtils - - -/** - * Consumes messages from a Amazon Kinesis streams and does wordcount. - * - * This example spins up 1 Kinesis Receiver per shard for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given stream. - * - * Usage: KinesisWordCountASL - *is the name of the consumer app, used to track the read data in DynamoDB - *name of the Kinesis stream (ie. mySparkStream) - *endpoint of the Kinesis service - * (e.g. https://kinesis.us-east-1.amazonaws.com) - * - * - * Example: - * # export AWS keys if necessary - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * - * # run the example - * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - * - * There is a companion helper class called KinesisWordProducerASL which puts dummy data - * onto the Kinesis stream. - * - * This code uses the DefaultAWSCredentialsProviderChain to find credentials - * in the following order: - *Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - *Java System Properties - aws.accessKeyId and aws.secretKey - *Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - *Instance profile credentials - delivered through the Amazon EC2 metadata service - * For more information, see - * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - * - * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - * the Kinesis Spark Streaming integration. - */ -object KinesisWordCountASL extends Logging { - def main(args: Array[String]) { -// Check that all required args were passed in. -if (args.length != 3) { - System.err.println( -""" - |Usage: KinesisWordCountASL - | - | is the name of the consumer app, used to track the read data in DynamoDB - | is the name of the Kinesis stream - | is the endpoint of the Kinesis service - | (e.g. https://kinesis.us-east-1.amazonaws.com) - | - |Generate input data for Kinesis stream using the example KinesisWordProducerASL. - |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more - |details. -""".stripMargin) - System.exit(1) -} - -StreamingExamples.setStreamingLogLevels() - -// Populate the appropriate variables from the given args -val Array(appName, streamName, endpointUrl) = args - - -// Determine the number of shards from the stream using the low-level Kinesis Client -// from t
[5/8] spark git commit: [SPARK-13595][BUILD] Move docker, extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala new file mode 100644 index 000..15ac588 --- /dev/null +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.reflect.ClassTag + +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.Record + +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, StreamingContext} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object KinesisUtils { + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + * @param messageHandler A custom message handler that can generate a generic output from a + * Kinesis `Record`, which contains both message data, and metadata. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + messageHandler: Record => T): ReceiverInputDStream[T] = { +val cleanedHandler = ssc.sc.clean(messageHandler) +// Setting scope to override receiver stream's scope of "receiver stream" +ssc.withNamedScope("kinesis stream") { + new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), +initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, +cleanedHandler, None) +} + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
spark git commit: [SPARK-13763][SQL] Remove Project when its Child's Output is Nil
Repository: spark Updated Branches: refs/heads/master 256704c77 -> 23369c3bd [SPARK-13763][SQL] Remove Project when its Child's Output is Nil What changes were proposed in this pull request? As shown in another PR: https://github.com/apache/spark/pull/11596, we are using `SELECT 1` as a dummy table, when the table is used for SQL statements in which a table reference is required, but the contents of the table are not important. For example, ```SQL SELECT value FROM (select 1) dummyTable Lateral View explode(array(1,2,3)) adTable as value ``` Before the PR, the optimized plan contains a useless `Project` after Optimizer executing the `ColumnPruning` rule, as shown below: ``` == Analyzed Logical Plan == value: int Project [value#22] +- Generate explode(array(1, 2, 3)), true, false, Some(adtable), [value#22] +- SubqueryAlias dummyTable +- Project [1 AS 1#21] +- OneRowRelation$ == Optimized Logical Plan == Generate explode([1,2,3]), false, false, Some(adtable), [value#22] +- Project +- OneRowRelation$ ``` After the fix, the optimized plan removed the useless `Project`, as shown below: ``` == Optimized Logical Plan == Generate explode([1,2,3]), false, false, Some(adtable), [value#22] +- OneRowRelation$ ``` This PR is to remove `Project` when its Child's output is Nil How was this patch tested? Added a new unit test case into the suite `ColumnPruningSuite.scala` Author: gatorsmile Closes #11599 from gatorsmile/projectOneRowRelation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23369c3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23369c3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23369c3b Branch: refs/heads/master Commit: 23369c3bd2c6a6d7a2b9d1396d6962022676cee7 Parents: 256704c Author: gatorsmile Authored: Wed Mar 9 10:29:27 2016 -0800 Committer: Michael Armbrust Committed: Wed Mar 9 10:29:27 2016 -0800 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 6 +++--- .../sql/catalyst/optimizer/ColumnPruningSuite.scala | 16 2 files changed, 19 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23369c3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7455e68..586bf3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -381,12 +381,12 @@ object ColumnPruning extends Rule[LogicalPlan] { p } -// Can't prune the columns on LeafNode -case p @ Project(_, l: LeafNode) => p - // Eliminate no-op Projects case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child +// Can't prune the columns on LeafNode +case p @ Project(_, l: LeafNode) => p + // for all other logical plans that inherits the output from it's children case p @ Project(_, child) => val required = child.references ++ p.references http://git-wip-us.apache.org/repos/asf/spark/blob/23369c3b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index d09601e..409e922 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -157,6 +157,22 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(query), expected) } + test("Eliminate the Project with an empty projectList") { +val input = OneRowRelation +val expected = Project(Literal(1).as("1") :: Nil, input).analyze + +val query1 = + Project(Literal(1).as("1") :: Nil, Project(Literal(1).as("1") :: Nil, input)).analyze +comparePlans(Optimize.execute(query1), expected) + +val query2 = + Project(Literal(1).as("1") :: Nil, Project(Nil, input)).analyze +comparePlans(Optimize.execute(query2), expected) + +// to make sure the top Project will not be removed. +comparePlans(Optimize.execute(expected), expected) + } + test("column pruning for group") { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val original
spark git commit: [SPARK-13728][SQL] Fix ORC PPD test so that pushed filters can be checked.
Repository: spark Updated Branches: refs/heads/master 23369c3bd -> cad29a40b [SPARK-13728][SQL] Fix ORC PPD test so that pushed filters can be checked. ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13728 https://github.com/apache/spark/pull/11509 makes the output only single ORC file. It was 10 files but this PR writes only single file. So, this could not skip stripes in ORC by the pushed down filters. So, this PR simply repartitions data into 10 so that the test could pass. ## How was this patch tested? unittest and `./dev/run_tests` for code style test. Author: hyukjinkwon Closes #11593 from HyukjinKwon/SPARK-13728. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cad29a40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cad29a40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cad29a40 Branch: refs/heads/master Commit: cad29a40b24a8e89f2d906e263866546f8ab6071 Parents: 23369c3 Author: hyukjinkwon Authored: Wed Mar 9 10:48:53 2016 -0800 Committer: Michael Armbrust Committed: Wed Mar 9 10:48:53 2016 -0800 -- .../scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala| 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cad29a40/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 3c05266..9ca07e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -348,7 +348,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - ignore("SPARK-10623 Enable ORC PPD") { + test("SPARK-10623 Enable ORC PPD") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { import testImplicits._ @@ -363,7 +363,9 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val nullValue: Option[String] = None (maybeInt, nullValue) } -createDataFrame(data).toDF("a", "b").write.orc(path) +// It needs to repartition data so that we can have several ORC files +// in order to skip stripes in ORC. +createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) val df = sqlContext.read.orc(path) def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13615][ML] GeneralizedLinearRegression supports save/load
Repository: spark Updated Branches: refs/heads/master cad29a40b -> 0dd06485c [SPARK-13615][ML] GeneralizedLinearRegression supports save/load ## What changes were proposed in this pull request? ```GeneralizedLinearRegression``` supports ```save/load```. cc mengxr ## How was this patch tested? unit test. Author: Yanbo Liang Closes #11465 from yanboliang/spark-13615. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dd06485 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dd06485 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dd06485 Branch: refs/heads/master Commit: 0dd06485c4222a896c0d1ee6a04d30043de3626c Parents: cad29a4 Author: Yanbo Liang Authored: Wed Mar 9 11:59:22 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Mar 9 11:59:22 2016 -0800 -- .../GeneralizedLinearRegression.scala | 74 +--- .../GeneralizedLinearRegressionSuite.scala | 32 - 2 files changed, 96 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0dd06485/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index a850dfe..de1dff9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.regression import breeze.stats.distributions.{Gaussian => GD} +import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.{Experimental, Since} @@ -26,7 +27,7 @@ import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.optim._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg.{BLAS, Vector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} @@ -106,7 +107,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam @Since("2.0.0") class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val uid: String) extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] - with GeneralizedLinearRegressionBase with Logging { + with GeneralizedLinearRegressionBase with DefaultParamsWritable with Logging { import GeneralizedLinearRegression._ @@ -236,10 +237,13 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } @Since("2.0.0") -private[ml] object GeneralizedLinearRegression { +object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLinearRegression] { + + @Since("2.0.0") + override def load(path: String): GeneralizedLinearRegression = super.load(path) /** Set of family and link pairs that GeneralizedLinearRegression supports. */ - lazy val supportedFamilyAndLinkPairs = Set( + private[ml] lazy val supportedFamilyAndLinkPairs = Set( Gaussian -> Identity, Gaussian -> Log, Gaussian -> Inverse, Binomial -> Logit, Binomial -> Probit, Binomial -> CLogLog, Poisson -> Log, Poisson -> Identity, Poisson -> Sqrt, @@ -247,12 +251,12 @@ private[ml] object GeneralizedLinearRegression { ) /** Set of family names that GeneralizedLinearRegression supports. */ - lazy val supportedFamilyNames = supportedFamilyAndLinkPairs.map(_._1.name) + private[ml] lazy val supportedFamilyNames = supportedFamilyAndLinkPairs.map(_._1.name) /** Set of link names that GeneralizedLinearRegression supports. */ - lazy val supportedLinkNames = supportedFamilyAndLinkPairs.map(_._2.name) + private[ml] lazy val supportedLinkNames = supportedFamilyAndLinkPairs.map(_._2.name) - val epsilon: Double = 1E-16 + private[ml] val epsilon: Double = 1E-16 /** * Wrapper of family and link combination used in the model. @@ -552,7 +556,7 @@ class GeneralizedLinearRegressionModel private[ml] ( @Since("2.0.0") val coefficients: Vector, @Since("2.0.0") val intercept: Double) extends RegressionModel[Vector, GeneralizedLinearRegressionModel] - with GeneralizedLinearRegressionBase { + with GeneralizedLinearRegressionBase with MLWritable { import GeneralizedLinearRegression._ @@ -574,4 +578,58 @@ class GeneralizedLinearRegressionModel private[ml] ( copyValues(new GeneralizedLinearRegressionModel(uid, coefficients, intercept), extra) .setParent(parent) } + +
spark git commit: [SPARK-13523] [SQL] Reuse exchanges in a query
Repository: spark Updated Branches: refs/heads/master 0dd06485c -> 3dc9ae2e1 [SPARK-13523] [SQL] Reuse exchanges in a query ## What changes were proposed in this pull request? Itâs possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, itâs a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None ::- Project [id#0L] :: +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None :: :- Range 0, 1, 4, 1024, [id#0L] :: +- INPUT :+- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None ::- INPUT :+- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : ::- INPUT : :+- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : :: +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ). Author: Davies Liu Closes #11403 from davies/dedup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dc9ae2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dc9ae2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dc9ae2e Branch: refs/heads/master Commit: 3dc9ae2e158e5b51df6f799767946fe1d190156b Parents: 0dd0648 Author: Davies Liu Authored: Wed Mar 9 12:04:29 2016 -0800 Committer: Davies Liu Committed: Wed Mar 9 12:04:29 2016 -0800 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 63 +- .../catalyst/plans/logical/LogicalPlan.scala| 55 +--- .../catalyst/plans/physical/broadcastMode.scala | 9 ++ .../spark/sql/execution/SparkPlanInfo.scala | 22 - .../execution/aggregate/TungstenAggregate.scala | 4 + .../spark/sql/execution/basicOperators.scala| 3 + .../execution/exchange/BroadcastExchange.scala | 10 ++- .../spark/sql/execution/exchange/Exchange.scala | 92 .../execution/exchange/ShuffleExchange.scala| 29 +++--- .../sql/execution/joins/HashedRelation.scala| 15 +++- .../spark/sql/execution/ui/SparkPlanGraph.scala | 20 +++-- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++ .../spark/sql/internal/SessionState.scala | 6 +- .../org/apache/spark/sql/DataFrameSuite.scala | 38 +++- .../spark/sql/execution/ExchangeSuite.scala | 72 ++- .../spark/sql/execution/PlannerSuite.scala | 49 ++- 16 files changed, 403 insertions(+), 90 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3dc9ae2e/sql/catalyst/src/m
spark git commit: [SPARK-13242] [SQL] codegen fallback in case-when if there many branches
Repository: spark Updated Branches: refs/heads/branch-1.6 95105b0e6 -> bea91a9e9 [SPARK-13242] [SQL] codegen fallback in case-when if there many branches ## What changes were proposed in this pull request? If there are many branches in a CaseWhen expression, the generated code could go above the 64K limit for single java method, will fail to compile. This PR change it to fallback to interpret mode if there are more than 20 branches. ## How was this patch tested? Add tests Author: Davies Liu Closes #11606 from davies/fix_when_16. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bea91a9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bea91a9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bea91a9e Branch: refs/heads/branch-1.6 Commit: bea91a9e94341b4cab1977911e91d56016c55cb3 Parents: 95105b0 Author: Davies Liu Authored: Wed Mar 9 12:05:34 2016 -0800 Committer: Davies Liu Committed: Wed Mar 9 12:05:34 2016 -0800 -- .../expressions/conditionalExpressions.scala| 23 ++-- .../expressions/CodeGenerationSuite.scala | 22 +++ 2 files changed, 43 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bea91a9e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index 40b1eec..c4e5b84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -110,6 +110,14 @@ trait CaseWhenLike extends Expression { // If no value is nullable and no elseValue is provided, the whole statement defaults to null. thenList.exists(_.nullable) || (elseValue.map(_.nullable).getOrElse(true)) } + + /** + * Whether should it fallback to interpret mode or not. + * @return + */ + protected def shouldFallback: Boolean = { +branches.length > 20 + } } // scalastyle:off @@ -119,7 +127,7 @@ trait CaseWhenLike extends Expression { * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ConditionalFunctions */ // scalastyle:on -case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike { +case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike with CodegenFallback { // Use private[this] Array to speed up evaluation. @transient private[this] lazy val branchesArr = branches.toArray @@ -157,6 +165,11 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike { } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { +if (shouldFallback) { + // Fallback to interpreted mode if there are too many branches, as it may reach the + // 64K limit (limit on bytecode size for a single function). + return super[CodegenFallback].genCode(ctx, ev) +} val len = branchesArr.length val got = ctx.freshName("got") @@ -213,7 +226,8 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike { * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ConditionalFunctions */ // scalastyle:on -case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseWhenLike { +case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) + extends CaseWhenLike with CodegenFallback { // Use private[this] Array to speed up evaluation. @transient private[this] lazy val branchesArr = branches.toArray @@ -257,6 +271,11 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { +if (shouldFallback) { + // Fallback to interpreted mode if there are too many branches, as it may reach the + // 64K limit (limit on bytecode size for a single function). + return super[CodegenFallback].genCode(ctx, ev) +} val keyEval = key.gen(ctx) val len = branchesArr.length val got = ctx.freshName("got") http://git-wip-us.apache.org/repos/asf/spark/blob/bea91a9e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expression
spark git commit: [SPARK-13527][SQL] Prune Filters based on Constraints
Repository: spark Updated Branches: refs/heads/master 3dc9ae2e1 -> c6aa356cd [SPARK-13527][SQL] Prune Filters based on Constraints What changes were proposed in this pull request? Remove all the deterministic conditions in a [[Filter]] that are contained in the Child's Constraints. For example, the first query can be simplified to the second one. ```scala val queryWithUselessFilter = tr1 .where("tr1.a".attr > 10 || "tr1.c".attr < 10) .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) .where( ("tr1.a".attr > 10 || "tr1.c".attr < 10) && 'd.attr < 100 && "tr2.a".attr === "tr1.a".attr) ``` ```scala val query = tr1 .where("tr1.a".attr > 10 || "tr1.c".attr < 10) .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) ``` How was this patch tested? Six test cases are added. Author: gatorsmile Closes #11406 from gatorsmile/FilterRemoval. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6aa356c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6aa356c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6aa356c Branch: refs/heads/master Commit: c6aa356cd831ea2d159568b699bd5b791f3d8f25 Parents: 3dc9ae2 Author: gatorsmile Authored: Wed Mar 9 12:50:55 2016 -0800 Committer: Michael Armbrust Committed: Wed Mar 9 12:50:55 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 26 +++- .../optimizer/BooleanSimplificationSuite.scala | 2 +- .../catalyst/optimizer/PruneFiltersSuite.scala | 136 +++ .../catalyst/optimizer/SetOperationSuite.scala | 2 +- .../datasources/parquet/ParquetFilters.scala| 2 +- 5 files changed, 160 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 586bf3d..650b4ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { BooleanSimplification, SimplifyConditionals, RemoveDispensableExpressions, - SimplifyFilters, + PruneFilters, SimplifyCasts, SimplifyCaseConversionExpressions, EliminateSerialization) :: @@ -827,11 +827,12 @@ object CombineFilters extends Rule[LogicalPlan] { } /** - * Removes filters that can be evaluated trivially. This is done either by eliding the filter for - * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the - * filter will always evaluate to `false`. + * Removes filters that can be evaluated trivially. This can be done through the following ways: + * 1) by eliding the filter for cases where it will always evaluate to `true`. + * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`. + * 3) by eliminating the always-true conditions given the constraints on the child's output. */ -object SimplifyFilters extends Rule[LogicalPlan] { +object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child @@ -839,6 +840,21 @@ object SimplifyFilters extends Rule[LogicalPlan] { // replace the input with an empty relation. case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) +// If any deterministic condition is guaranteed to be true given the constraints on the child's +// output, remove the condition +case f @ Filter(fc, p: LogicalPlan) => + val (prunedPredicates, remainingPredicates) = +splitConjunctivePredicates(fc).partition { cond => + cond.deterministic && p.constraints.contains(cond) +} + if (prunedPredicates.isEmpty) { +f + } else if (remainingPredicates.isEmpty) { +p + } else { +val newCond = remainingPredicates.reduce(And) +Filter(newCond, p) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala -
spark git commit: [SPARK-11861][ML] Add feature importances for decision trees
Repository: spark Updated Branches: refs/heads/master c6aa356cd -> e1772d3f1 [SPARK-11861][ML] Add feature importances for decision trees This patch adds an API entry point for single decision tree feature importances. Author: sethah Closes #9912 from sethah/SPARK-11861. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1772d3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1772d3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1772d3f Branch: refs/heads/master Commit: e1772d3f19bed7e69a80de7900ed22d3eeb05300 Parents: c6aa356 Author: sethah Authored: Wed Mar 9 14:44:51 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Mar 9 14:44:51 2016 -0800 -- .../classification/DecisionTreeClassifier.scala | 19 + .../classification/RandomForestClassifier.scala | 4 +-- .../ml/regression/DecisionTreeRegressor.scala | 19 + .../ml/regression/RandomForestRegressor.scala | 4 +-- .../spark/ml/tree/impl/RandomForest.scala | 30 .../DecisionTreeClassifierSuite.scala | 21 ++ .../RandomForestClassifierSuite.scala | 10 ++- .../org/apache/spark/ml/impl/TreeTests.scala| 13 + .../regression/DecisionTreeRegressorSuite.scala | 20 + .../regression/RandomForestRegressorSuite.scala | 13 ++--- 10 files changed, 126 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1772d3f/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 8c4cec1..7f0397f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -169,6 +169,25 @@ final class DecisionTreeClassificationModel private[ml] ( s"DecisionTreeClassificationModel (uid=$uid) of depth $depth with $numNodes nodes" } + /** + * Estimate of the importance of each feature. + * + * This generalizes the idea of "Gini" importance to other losses, + * following the explanation of Gini importance from "Random Forests" documentation + * by Leo Breiman and Adele Cutler, and following the implementation from scikit-learn. + * + * This feature importance is calculated as follows: + * - importance(feature j) = sum (over nodes which split on feature j) of the gain, + * where gain is scaled by the number of instances passing through node + * - Normalize importances for tree to sum to 1. + * + * Note: Feature importance for single decision trees can have high variance due to + * correlated predictor variables. Consider using a [[RandomForestClassifier]] + * to determine feature importance instead. + */ + @Since("2.0.0") + lazy val featureImportances: Vector = RandomForest.featureImportances(this, numFeatures) + /** (private[ml]) Convert to a model in the old API */ private[ml] def toOld: OldDecisionTreeModel = { new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Classification) http://git-wip-us.apache.org/repos/asf/spark/blob/e1772d3f/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index f7d662d..5da04d3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -230,10 +230,10 @@ final class RandomForestClassificationModel private[ml] ( * - Average over trees: * - importance(feature j) = sum (over nodes which split on feature j) of the gain, * where gain is scaled by the number of instances passing through node - * - Normalize importances for tree based on total number of training instances used - * to build tree. + * - Normalize importances for tree to sum to 1. * - Normalize feature importance vector to sum to 1. */ + @Since("1.5.0") lazy val featureImportances: Vector = RandomForest.featureImportances(trees, numFeatures) /** (private[ml]) Convert to a model in the old API */ http://git-wip-us.apache.org/repos/asf/spark/blob/e1772d3f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala ---
spark git commit: [SPARK-13781][SQL] Use ExpressionSets in ConstraintPropagationSuite
Repository: spark Updated Branches: refs/heads/master e1772d3f1 -> dbf2a7cfa [SPARK-13781][SQL] Use ExpressionSets in ConstraintPropagationSuite ## What changes were proposed in this pull request? This PR is a small follow up on https://github.com/apache/spark/pull/11338 (https://issues.apache.org/jira/browse/SPARK-13092) to use `ExpressionSet` as part of the verification logic in `ConstraintPropagationSuite`. ## How was this patch tested? No new tests added. Just changes the verification logic in `ConstraintPropagationSuite`. Author: Sameer Agarwal Closes #11611 from sameeragarwal/expression-set. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbf2a7cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbf2a7cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbf2a7cf Branch: refs/heads/master Commit: dbf2a7cfad067d2c553d8b8831e04aace12fcee1 Parents: e1772d3 Author: Sameer Agarwal Authored: Wed Mar 9 15:27:18 2016 -0800 Committer: Michael Armbrust Committed: Wed Mar 9 15:27:18 2016 -0800 -- .../plans/ConstraintPropagationSuite.scala | 50 ++-- 1 file changed, 25 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dbf2a7cf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index b68432b..868ad93 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -32,9 +32,9 @@ class ConstraintPropagationSuite extends SparkFunSuite { private def resolveColumn(plan: LogicalPlan, columnName: String): Expression = plan.resolveQuoted(columnName, caseInsensitiveResolution).get - private def verifyConstraints(found: Set[Expression], expected: Set[Expression]): Unit = { -val missing = expected.filterNot(i => found.map(_.semanticEquals(i)).reduce(_ || _)) -val extra = found.filterNot(i => expected.map(_.semanticEquals(i)).reduce(_ || _)) + private def verifyConstraints(found: ExpressionSet, expected: ExpressionSet): Unit = { +val missing = expected -- found +val extra = found -- expected if (missing.nonEmpty || extra.nonEmpty) { fail( s""" @@ -58,18 +58,18 @@ class ConstraintPropagationSuite extends SparkFunSuite { verifyConstraints(tr .where('a.attr > 10) .analyze.constraints, - Set(resolveColumn(tr, "a") > 10, -IsNotNull(resolveColumn(tr, "a" + ExpressionSet(Seq(resolveColumn(tr, "a") > 10, +IsNotNull(resolveColumn(tr, "a") verifyConstraints(tr .where('a.attr > 10) .select('c.attr, 'a.attr) .where('c.attr < 100) .analyze.constraints, - Set(resolveColumn(tr, "a") > 10, + ExpressionSet(Seq(resolveColumn(tr, "a") > 10, resolveColumn(tr, "c") < 100, IsNotNull(resolveColumn(tr, "a")), -IsNotNull(resolveColumn(tr, "c" +IsNotNull(resolveColumn(tr, "c") } test("propagating constraints in aggregate") { @@ -81,10 +81,10 @@ class ConstraintPropagationSuite extends SparkFunSuite { .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a).analyze verifyConstraints(aliasedRelation.analyze.constraints, - Set(resolveColumn(aliasedRelation.analyze, "c1") > 10, + ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "c1") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "c1")), resolveColumn(aliasedRelation.analyze, "a") < 5, -IsNotNull(resolveColumn(aliasedRelation.analyze, "a" +IsNotNull(resolveColumn(aliasedRelation.analyze, "a") } test("propagating constraints in aliases") { @@ -95,11 +95,11 @@ class ConstraintPropagationSuite extends SparkFunSuite { val aliasedRelation = tr.where('a.attr > 10).select('a.as('x), 'b, 'b.as('y), 'a.as('z)) verifyConstraints(aliasedRelation.analyze.constraints, - Set(resolveColumn(aliasedRelation.analyze, "x") > 10, + ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), resolveColumn(aliasedRelation.analyze, "z") > 10, -IsNotNull(resolveColumn(aliasedRelation.analyze, "z" +IsNotNull(resolveColumn(aliasedRelation
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.1-rc1 [deleted] 15de51c23 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.1 [created] 152252f15 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.1 [deleted] 152252f15 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.1 [created] 15de51c23 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13747][SQL] Fix concurrent query with fork-join pool
Repository: spark Updated Branches: refs/heads/master dbf2a7cfa -> 37fcda3e6 [SPARK-13747][SQL] Fix concurrent query with fork-join pool ## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or Closes #11586 from andrewor14/fix-concurrent-sql. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37fcda3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37fcda3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37fcda3e Branch: refs/heads/master Commit: 37fcda3e6cf1707fb7a348a4d47231849ef8abf6 Parents: dbf2a7c Author: Andrew Or Authored: Wed Mar 9 17:34:28 2016 -0800 Committer: Shixiong Zhu Committed: Wed Mar 9 17:34:28 2016 -0800 -- .../org/apache/spark/scheduler/DAGScheduler.scala | 7 ++- .../spark/sql/execution/SQLExecutionSuite.scala | 14 ++ 2 files changed, 20 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2eaef5..b576d4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -613,7 +613,12 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) -Await.ready(waiter.completionFuture, atMost = Duration.Inf) +// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, +// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that +// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's +// safe to pass in null here. For more detail, see SPARK-13747. +val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] +waiter.completionFuture.ready(Duration.Inf)(awaitPermission) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 824d89e..c9f517c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite { } } + test("concurrent query execution with fork-join pool (SPARK-13747)") { +val sc = new SparkContext("local[*]", "test") +val sqlContext = new SQLContext(sc) +import sqlContext.implicits._ +try { + // Should not throw IllegalArgumentException + (1 to 100).par.foreach { _ => +sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() + } +} finally { + sc.stop() +} + } + /** * Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13778][CORE] Set the executor state for a worker when removing it
Repository: spark Updated Branches: refs/heads/master 37fcda3e6 -> 40e067675 [SPARK-13778][CORE] Set the executor state for a worker when removing it ## What changes were proposed in this pull request? When a worker is lost, the executors on this worker are also lost. But Master's ApplicationPage still displays their states as running. This patch just sets the executor state to `LOST` when a worker is lost. ## How was this patch tested? manual tests Author: Shixiong Zhu Closes #11609 from zsxwing/SPARK-13778. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40e06767 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40e06767 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40e06767 Branch: refs/heads/master Commit: 40e0676757c6c7ff367b6738fb42762a29657e94 Parents: 37fcda3 Author: Shixiong Zhu Authored: Wed Mar 9 17:54:34 2016 -0800 Committer: Andrew Or Committed: Wed Mar 9 17:54:34 2016 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40e06767/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0f11f68..ff8d29f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -763,6 +763,7 @@ private[deploy] class Master( logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.LOST, Some("worker lost"), None)) + exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } for (driver <- worker.drivers.values) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13775] History page sorted by completed time desc by default.
Repository: spark Updated Branches: refs/heads/master 40e067675 -> 238447db5 [SPARK-13775] History page sorted by completed time desc by default. ## What changes were proposed in this pull request? Originally the page is sorted by AppID by default. After tests with users' feedback, we think it might be best to sort by completed time (desc). ## How was this patch tested? Manually test, with screenshot as follows. ![sorted-by-complete-time-desc](https://cloud.githubusercontent.com/assets/11683054/13647686/d6dea924-e5fa-11e5-8fc5-68e039b74b6f.png) Author: zhuol Closes #11608 from zhuoliu/13775. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/238447db Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/238447db Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/238447db Branch: refs/heads/master Commit: 238447db5694a236e57fa21e22a5d6d45efa436e Parents: 40e0676 Author: zhuol Authored: Wed Mar 9 17:58:09 2016 -0800 Committer: Andrew Or Committed: Wed Mar 9 17:58:09 2016 -0800 -- core/src/main/resources/org/apache/spark/ui/static/historypage.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/238447db/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 4ff0831..2976099 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -165,7 +165,7 @@ $(document).ready(function() { {name: 'eighth'}, ], "autoWidth": false, -"order": [[ 0, "desc" ]] +"order": [[ 4, "desc" ]] }; var rowGroupConf = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Fix typo in 'hypot' docstring
Repository: spark Updated Branches: refs/heads/master 238447db5 -> 5f7dbdba6 [MINOR] Fix typo in 'hypot' docstring Minor typo: docstring for pyspark.sql.functions: hypot has extra characters N/A Author: Tristan Reid Closes #11616 from tristanreid/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f7dbdba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f7dbdba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f7dbdba Branch: refs/heads/master Commit: 5f7dbdba6fec7615d3813365228ea093585e91f0 Parents: 238447d Author: Tristan Reid Authored: Wed Mar 9 18:05:00 2016 -0800 Committer: Andrew Or Committed: Wed Mar 9 18:05:03 2016 -0800 -- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f7dbdba/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 88924e2..dee3d53 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -142,7 +142,7 @@ _functions_1_6 = { _binary_mathfunctions = { 'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' + 'polar coordinates (r, theta).', -'hypot': 'Computes `sqrt(a^2^ + b^2^)` without intermediate overflow or underflow.', +'hypot': 'Computes `sqrt(a^2 + b^2)` without intermediate overflow or underflow.', 'pow': 'Returns the value of the first argument raised to the power of the second argument.', } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13492][MESOS] Configurable Mesos framework webui URL.
Repository: spark Updated Branches: refs/heads/master 5f7dbdba6 -> a4a0addcc [SPARK-13492][MESOS] Configurable Mesos framework webui URL. ## What changes were proposed in this pull request? Previously the Mesos framework webui URL was being derived only from the Spark UI address leaving no possibility to configure it. This commit makes it configurable. If unset it falls back to the previous behavior. Motivation: This change is necessary in order to be able to install Spark on DCOS and to be able to give it a custom service link. The configured `webui_url` is configured to point to a reverse proxy in the DCOS environment. ## How was this patch tested? Locally, using unit tests and on DCOS testing and stable revision. Author: Sergiusz Urbaniak Closes #11369 from s-urbaniak/sur-webui-url. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4a0addc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4a0addc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4a0addc Branch: refs/heads/master Commit: a4a0addccffb7cd0ece7947d55ce2538afa54c97 Parents: 5f7dbdb Author: Sergiusz Urbaniak Authored: Wed Mar 9 18:10:01 2016 -0800 Committer: Andrew Or Committed: Wed Mar 9 18:10:01 2016 -0800 -- .../spark/ui/static/historypage-template.html | 2 +- .../org/apache/spark/ui/static/historypage.js | 8 ++--- .../deploy/mesos/MesosClusterDispatcher.scala | 2 +- .../mesos/MesosClusterDispatcherArguments.scala | 2 +- .../mesos/CoarseMesosSchedulerBackend.scala | 3 +- .../cluster/mesos/MesosSchedulerBackend.scala | 3 +- .../CoarseMesosSchedulerBackendSuite.scala | 30 .../mesos/MesosSchedulerBackendSuite.scala | 37 +++- docs/running-on-mesos.md| 16 + 9 files changed, 93 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a4a0addc/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index e5ed5b3..5a7a252 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -67,7 +67,7 @@ {{id}} {{name}} {{#attempts}} - {{attemptId}} + {{attemptId}} {{startTime}} {{endTime}} {{duration}} http://git-wip-us.apache.org/repos/asf/spark/blob/a4a0addc/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 2976099..6096513 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -110,7 +110,7 @@ $(document).ready(function() { requestedIncomplete = getParameterByName("showIncomplete", searchString); requestedIncomplete = (requestedIncomplete == "true" ? true : false); -$.getJSON("/api/v1/applications", function(response,status,jqXHR) { +$.getJSON("api/v1/applications", function(response,status,jqXHR) { var array = []; var hasMultipleAttempts = false; for (i in response) { @@ -139,9 +139,9 @@ $(document).ready(function() { var url = null if (maxAttemptId == null) { -url = "/history/" + id + "/" +url = "history/" + id + "/" } else { -url = "/history/" + id + "/" + maxAttemptId + "/" +url = "history/" + id + "/" + maxAttemptId + "/" } var app_clone = {"id" : id, "name" : name, "url" : url, "attempts" : [attempt]}; @@ -150,7 +150,7 @@ $(document).ready(function() { } var data = {"applications": array} - $.get("/static/historypage-template.html", function(template) { + $.get("static/historypage-template.html", function(template) { historySummary.append(Mustache.render($(template).filter("#history-summary-template").html(),data)); var selector = "#history-summary-table"; var conf = { http://git-wip-us.apache.org/repos/asf/spark/blob/a4a0addc/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/Me
spark git commit: [SPARK-13760][SQL] Fix BigDecimal constructor for FloatType
Repository: spark Updated Branches: refs/heads/master a4a0addcc -> 926e9c45a [SPARK-13760][SQL] Fix BigDecimal constructor for FloatType ## What changes were proposed in this pull request? A very minor change for using `BigDecimal.decimal(f: Float)` instead of `BigDecimal(f: float)`. The latter is deprecated and can result in inconsistencies due to an implicit conversion to `Double`. ## How was this patch tested? N/A cc yhuai Author: Sameer Agarwal Closes #11597 from sameeragarwal/bigdecimal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/926e9c45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/926e9c45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/926e9c45 Branch: refs/heads/master Commit: 926e9c45a21c5b71ef0832d63b8dae7d4f3d8826 Parents: a4a0add Author: Sameer Agarwal Authored: Wed Mar 9 18:16:29 2016 -0800 Committer: Yin Huai Committed: Wed Mar 9 18:16:29 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/mathExpressions.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/926e9c45/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index bc2df0f..bc83f9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -748,7 +748,7 @@ case class Round(child: Expression, scale: Expression) if (f.isNaN || f.isInfinite) { f } else { - BigDecimal(f).setScale(_scale, HALF_UP).toFloat + BigDecimal.decimal(f).setScale(_scale, HALF_UP).toFloat } case DoubleType => val d = input1.asInstanceOf[Double] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13760][SQL] Fix BigDecimal constructor for FloatType
Repository: spark Updated Branches: refs/heads/branch-1.6 bea91a9e9 -> 8a1bd5834 [SPARK-13760][SQL] Fix BigDecimal constructor for FloatType ## What changes were proposed in this pull request? A very minor change for using `BigDecimal.decimal(f: Float)` instead of `BigDecimal(f: float)`. The latter is deprecated and can result in inconsistencies due to an implicit conversion to `Double`. ## How was this patch tested? N/A cc yhuai Author: Sameer Agarwal Closes #11597 from sameeragarwal/bigdecimal. (cherry picked from commit 926e9c45a21c5b71ef0832d63b8dae7d4f3d8826) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a1bd583 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a1bd583 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a1bd583 Branch: refs/heads/branch-1.6 Commit: 8a1bd5834cec61a882c54fdf56385c76b221cb4f Parents: bea91a9 Author: Sameer Agarwal Authored: Wed Mar 9 18:16:29 2016 -0800 Committer: Yin Huai Committed: Wed Mar 9 18:17:16 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/mathExpressions.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a1bd583/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index 28f616f..4825bd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -739,7 +739,7 @@ case class Round(child: Expression, scale: Expression) if (f.isNaN || f.isInfinite) { f } else { - BigDecimal(f).setScale(_scale, HALF_UP).toFloat + BigDecimal.decimal(f).setScale(_scale, HALF_UP).toFloat } case DoubleType => val d = input1.asInstanceOf[Double] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-13760][SQL] Fix BigDecimal constructor for FloatType"
Repository: spark Updated Branches: refs/heads/master 926e9c45a -> 790646125 Revert "[SPARK-13760][SQL] Fix BigDecimal constructor for FloatType" This reverts commit 926e9c45a21c5b71ef0832d63b8dae7d4f3d8826. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79064612 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79064612 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79064612 Branch: refs/heads/master Commit: 790646125edd8b853e2ad2425112590e78799bd3 Parents: 926e9c4 Author: Yin Huai Authored: Wed Mar 9 18:41:38 2016 -0800 Committer: Yin Huai Committed: Wed Mar 9 18:41:38 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/mathExpressions.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79064612/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index bc83f9e..bc2df0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -748,7 +748,7 @@ case class Round(child: Expression, scale: Expression) if (f.isNaN || f.isInfinite) { f } else { - BigDecimal.decimal(f).setScale(_scale, HALF_UP).toFloat + BigDecimal(f).setScale(_scale, HALF_UP).toFloat } case DoubleType => val d = input1.asInstanceOf[Double] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-13760][SQL] Fix BigDecimal constructor for FloatType"
Repository: spark Updated Branches: refs/heads/branch-1.6 8a1bd5834 -> 60cb27040 Revert "[SPARK-13760][SQL] Fix BigDecimal constructor for FloatType" This reverts commit 926e9c45a21c5b71ef0832d63b8dae7d4f3d8826. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60cb2704 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60cb2704 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60cb2704 Branch: refs/heads/branch-1.6 Commit: 60cb27040c3cae531f71985f84f4c0321aa91c94 Parents: 8a1bd58 Author: Yin Huai Authored: Wed Mar 9 18:41:38 2016 -0800 Committer: Yin Huai Committed: Wed Mar 9 18:43:54 2016 -0800 -- .../apache/spark/sql/catalyst/expressions/mathExpressions.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60cb2704/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index 4825bd2..28f616f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -739,7 +739,7 @@ case class Round(child: Expression, scale: Expression) if (f.isNaN || f.isInfinite) { f } else { - BigDecimal.decimal(f).setScale(_scale, HALF_UP).toFloat + BigDecimal(f).setScale(_scale, HALF_UP).toFloat } case DoubleType => val d = input1.asInstanceOf[Double] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13766][SQL] Consistent file extensions for files written by internal data sources
Repository: spark Updated Branches: refs/heads/master 790646125 -> aa0eba2c3 [SPARK-13766][SQL] Consistent file extensions for files written by internal data sources ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13766 This PR makes the file extensions (written by internal datasource) consistent. **Before** - TEXT, CSV and JSON ``` [.COMPRESSION_CODEC_NAME] ``` - Parquet ``` [.COMPRESSION_CODEC_NAME].parquet ``` - ORC ``` .orc ``` **After** - TEXT, CSV and JSON ``` .txt[.COMPRESSION_CODEC_NAME] .csv[.COMPRESSION_CODEC_NAME] .json[.COMPRESSION_CODEC_NAME] ``` - Parquet ``` [.COMPRESSION_CODEC_NAME].parquet ``` - ORC ``` [.COMPRESSION_CODEC_NAME].orc ``` When the compression codec is set, - For Parquet and ORC, each still stays in Parquet and ORC format but just have compressed data internally. So, I think it is okay to name `.parquet` and `.orc` at the end. - For Text, CSV and JSON, each does not stays in each format but it has different data format according to compression codec. So, each has the names `.json`, `.csv` and `.txt` before the compression extension. ## How was this patch tested? Unit tests are used and `./dev/run_tests` for coding style tests. Author: hyukjinkwon Closes #11604 from HyukjinKwon/SPARK-13766. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa0eba2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa0eba2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa0eba2c Branch: refs/heads/master Commit: aa0eba2c354dc57dd83a427daa68d6171f292a83 Parents: 7906461 Author: hyukjinkwon Authored: Wed Mar 9 19:12:46 2016 -0800 Committer: Reynold Xin Committed: Wed Mar 9 19:12:46 2016 -0800 -- .../execution/datasources/csv/CSVRelation.scala| 2 +- .../execution/datasources/json/JSONRelation.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 3 +++ .../execution/datasources/text/DefaultSource.scala | 2 +- .../sql/execution/datasources/csv/CSVSuite.scala | 4 ++-- .../sql/execution/datasources/json/JsonSuite.scala | 4 ++-- .../sql/execution/datasources/text/TextSuite.scala | 4 ++-- .../apache/spark/sql/hive/orc/OrcRelation.scala| 17 - .../sql/hive/orc/OrcHadoopFsRelationSuite.scala| 2 +- 9 files changed, 29 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index d7ce9a0..0e6b985 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -140,7 +140,7 @@ private[sql] class CsvOutputWriter( val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId -new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") +new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension") } }.getRecordWriter(context) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 497e3c5..05b44d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -167,7 +167,7 @@ private[json] class JsonOutputWriter( val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") -new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") +new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension") } }.getRecordWriter(context) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala -- diff --git
spark git commit: [SPARK-13794][SQL] Rename DataFrameWriter.stream() DataFrameWriter.startStream()
Repository: spark Updated Branches: refs/heads/master aa0eba2c3 -> 8a3acb792 [SPARK-13794][SQL] Rename DataFrameWriter.stream() DataFrameWriter.startStream() ## What changes were proposed in this pull request? The new name makes it more obvious with the verb "start" that we are actually starting some execution. ## How was this patch tested? This is just a rename. Existing unit tests should cover it. Author: Reynold Xin Closes #11627 from rxin/SPARK-13794. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a3acb79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a3acb79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a3acb79 Branch: refs/heads/master Commit: 8a3acb792d3a816dd0017fb4b79fc05152472b83 Parents: aa0eba2 Author: Reynold Xin Authored: Wed Mar 9 21:04:56 2016 -0800 Committer: Reynold Xin Committed: Wed Mar 9 21:04:56 2016 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +++ .../streaming/DataFrameReaderWriterSuite.scala | 22 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a3acb79/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 78f30f4..3349b84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -206,7 +206,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** - * Specifies the name of the [[ContinuousQuery]] that can be started with `stream()`. + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. * This name must be unique among all the currently active queries in the associated SQLContext. * * @since 2.0.0 @@ -223,8 +223,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 2.0.0 */ - def stream(path: String): ContinuousQuery = { -option("path", path).stream() + def startStream(path: String): ContinuousQuery = { +option("path", path).startStream() } /** @@ -234,7 +234,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * @since 2.0.0 */ - def stream(): ContinuousQuery = { + def startStream(): ContinuousQuery = { val dataSource = DataSource( df.sqlContext, http://git-wip-us.apache.org/repos/asf/spark/blob/8a3acb79/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index f060c6f..0878277 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") - .stream() + .startStream() .stop() } @@ -82,7 +82,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") - .stream() + .startStream() .stop() } @@ -108,7 +108,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) - .stream() + .startStream() .stop() assert(LastOptions.parameters("opt1") == "1") @@ -123,14 +123,14 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") - .stream() + .startStream() .stop() assert(LastOptions.partitionColumns == Nil) df.write .format("org.apache.spark.sql.streaming.test") .partitionBy("a") - .stream() + .startStream() .stop() assert(LastOptions.partitionColumns == Seq("a")) @@ -138,7 +138,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") .partitionBy("A") -.stream() +.startStream() .stop() assert(LastOptions.partitionColumns == Seq("a")) } @@ -147,7
spark git commit: [SPARK-7420][STREAMING][TESTS] Enable test: o.a.s.streaming.JobGeneratorSuite "Do not clear received…
Repository: spark Updated Branches: refs/heads/master 8a3acb792 -> 8bcad28a5 [SPARK-7420][STREAMING][TESTS] Enable test: o.a.s.streaming.JobGeneratorSuite "Do not clear received⦠## How was this patch tested? unit test Author: proflin Closes #11626 from lw-lin/SPARK-7420. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bcad28a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bcad28a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bcad28a Branch: refs/heads/master Commit: 8bcad28a5a6788c96bf1c302eb6f18d37b798b03 Parents: 8a3acb7 Author: proflin Authored: Wed Mar 9 21:12:27 2016 -0800 Committer: Reynold Xin Committed: Wed Mar 9 21:12:27 2016 -0800 -- .../org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8bcad28a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala index 9b6cd4b..a2dbae1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -56,8 +56,7 @@ class JobGeneratorSuite extends TestSuiteBase { // 4. allow subsequent batches to be generated (to allow premature deletion of 3rd batch metadata) // 5. verify whether 3rd batch's block metadata still exists // - // TODO: SPARK-7420 enable this test - ignore("SPARK-6222: Do not clear received block data too soon") { + test("SPARK-6222: Do not clear received block data too soon") { import JobGeneratorSuite._ val checkpointDir = Utils.createTempDir() val testConf = conf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13706][ML] Add Python Example for Train Validation Split
Repository: spark Updated Branches: refs/heads/master 8bcad28a5 -> 3e3c3d58d [SPARK-13706][ML] Add Python Example for Train Validation Split ## What changes were proposed in this pull request? This pull request adds a python example for train validation split. ## How was this patch tested? This was style tested through lint-python, generally tested with ./dev/run-tests, and run in notebook and shell environments. It was viewed in docs locally with jekyll serve. This contribution is my original work and I license it to Spark under its open source license. Author: JeremyNixon Closes #11547 from JeremyNixon/tvs_example. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e3c3d58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e3c3d58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e3c3d58 Branch: refs/heads/master Commit: 3e3c3d58d8d42b42e930d42eb70b0e84d02967eb Parents: 8bcad28 Author: JeremyNixon Authored: Thu Mar 10 09:09:56 2016 +0200 Committer: Nick Pentreath Committed: Thu Mar 10 09:18:15 2016 +0200 -- docs/ml-guide.md| 4 ++ .../main/python/ml/train_validation_split.py| 68 2 files changed, 72 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e3c3d58/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index a5a825f6..9916787 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -316,4 +316,8 @@ The `ParamMap` which produces the best evaluation metric is selected as the best {% include_example java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java %} + +{% include_example python/ml/train_validation_split.py %} + + http://git-wip-us.apache.org/repos/asf/spark/blob/3e3c3d58/examples/src/main/python/ml/train_validation_split.py -- diff --git a/examples/src/main/python/ml/train_validation_split.py b/examples/src/main/python/ml/train_validation_split.py new file mode 100644 index 000..161a200 --- /dev/null +++ b/examples/src/main/python/ml/train_validation_split.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import SparkContext +# $example on$ +from pyspark.ml.evaluation import RegressionEvaluator +from pyspark.ml.regression import LinearRegression +from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit +from pyspark.sql import SQLContext +# $example off$ + +""" +This example demonstrates applying TrainValidationSplit to split data +and preform model selection. +Run with: + + bin/spark-submit examples/src/main/python/ml/train_validation_split.py +""" + +if __name__ == "__main__": +sc = SparkContext(appName="TrainValidationSplit") +sqlContext = SQLContext(sc) +# $example on$ +# Prepare training and test data. +data = sqlContext.read.format("libsvm")\ +.load("data/mllib/sample_linear_regression_data.txt") +train, test = data.randomSplit([0.7, 0.3]) +lr = LinearRegression(maxIter=10, regParam=0.1) + +# We use a ParamGridBuilder to construct a grid of parameters to search over. +# TrainValidationSplit will try all combinations of values and determine best model using +# the evaluator. +paramGrid = ParamGridBuilder()\ +.addGrid(lr.regParam, [0.1, 0.01]) \ +.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\ +.build() + +# In this case the estimator is simply the linear regression. +# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. +tvs = TrainValidationSplit(estimator=lr, + estimatorParamMaps=paramGrid, + evaluator=RegressionEvaluator(), + # 80% of the data will be used for training, 20% for validation. + trainRatio=0.8) + +# Run TrainValidationSplit, and c
spark git commit: [MINOR][SQL] Replace DataFrameWriter.stream() with startStream() in comments.
Repository: spark Updated Branches: refs/heads/master 3e3c3d58d -> 9525c563d [MINOR][SQL] Replace DataFrameWriter.stream() with startStream() in comments. ## What changes were proposed in this pull request? According to #11627 , this PR replace `DataFrameWriter.stream()` with `startStream()` in comments of `ContinuousQueryListener.java`. ## How was this patch tested? Manual. (It changes on comments.) Author: Dongjoon Hyun Closes #11629 from dongjoon-hyun/minor_rename. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9525c563 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9525c563 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9525c563 Branch: refs/heads/master Commit: 9525c563de9c446e108c1e9535238d99cc34cab9 Parents: 3e3c3d5 Author: Dongjoon Hyun Authored: Wed Mar 9 23:54:00 2016 -0800 Committer: Reynold Xin Committed: Wed Mar 9 23:54:00 2016 -0800 -- .../org/apache/spark/sql/util/ContinuousQueryListener.scala| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9525c563/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala index 73c78d1..2c5358c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala @@ -32,9 +32,9 @@ abstract class ContinuousQueryListener { /** * Called when a query is started. * @note This is called synchronously with - * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.stream()`]], - * that is, `onQueryStart` will be called on all listeners before `DataFrameWriter.stream()` - * returns the corresponding [[ContinuousQuery]]. + * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], + * that is, `onQueryStart` will be called on all listeners before + * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. */ def onQueryStarted(queryStarted: QueryStarted) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org