This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bdb73bb [SPARK-36613][SQL][SS] Use EnumSet as the implementation of Table.capabilities method return value bdb73bb is described below commit bdb73bbc277519f0c7ffa3ab856cf87515c12934 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Sep 5 08:23:05 2021 -0500 [SPARK-36613][SQL][SS] Use EnumSet as the implementation of Table.capabilities method return value ### What changes were proposed in this pull request? The `Table.capabilities` method return a `java.util.Set` of `TableCapability` enumeration type, which is implemented using `java.util.HashSet` now. Such Set can be replaced `with java.util.EnumSet` because `EnumSet` implementations can be much more efficient compared to other sets. ### Why are the changes needed? Use more appropriate data structures. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GA or Jenkins Tests. - Add a new benchmark to compare `create` and `contains` operation between `EnumSet` and `HashSet` Closes #33867 from LuciferYang/SPARK-36613. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/sql/kafka010/KafkaSourceProvider.scala | 4 +- .../EnumTypeSetBenchmark-jdk11-results.txt | 104 ++++++++++++ .../benchmarks/EnumTypeSetBenchmark-results.txt | 104 ++++++++++++ .../spark/sql/connector/catalog/V1Table.scala | 3 +- .../CreateTablePartitioningValidationSuite.scala | 3 +- .../connector/catalog/EnumTypeSetBenchmark.scala | 176 +++++++++++++++++++++ .../sql/connector/catalog/InMemoryTable.scala | 5 +- .../datasources/noop/NoopDataSource.scala | 6 +- .../sql/execution/datasources/v2/FileTable.scala | 2 +- .../execution/datasources/v2/jdbc/JDBCTable.scala | 2 +- .../spark/sql/execution/streaming/console.scala | 4 +- .../spark/sql/execution/streaming/memory.scala | 3 +- .../streaming/sources/ForeachWriterTable.scala | 4 +- .../streaming/sources/RateStreamProvider.scala | 4 +- .../sources/TextSocketSourceProvider.scala | 3 +- .../sql/execution/streaming/sources/memory.scala | 3 +- .../spark/sql/connector/JavaSimpleBatchTable.java | 5 +- .../connector/JavaSimpleWritableDataSource.java | 6 +- .../spark/sql/connector/DataSourceV2Suite.scala | 4 +- .../connector/FileDataSourceV2FallBackSuite.scala | 5 +- .../spark/sql/connector/LocalScanSuite.scala | 5 +- .../sql/connector/SimpleWritableDataSource.scala | 2 +- .../sql/connector/TableCapabilityCheckSuite.scala | 8 +- .../spark/sql/connector/V1ReadFallbackSuite.scala | 4 +- .../spark/sql/connector/V1WriteFallbackSuite.scala | 4 +- .../sources/StreamingDataSourceV2Suite.scala | 17 +- .../streaming/test/DataStreamTableAPISuite.scala | 7 +- .../sql/streaming/util/BlockOnStopSource.scala | 4 +- 28 files changed, 433 insertions(+), 68 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 4a75ab0..640996d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -408,8 +408,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // ACCEPT_ANY_SCHEMA is needed because of the following reasons: // * Kafka writer validates the schema instead of the SQL analyzer (the schema is fixed) // * Read schema differs from write schema (please see Kafka integration guide) - Set(BATCH_READ, BATCH_WRITE, MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE, - ACCEPT_ANY_SCHEMA).asJava + ju.EnumSet.of(BATCH_READ, BATCH_WRITE, MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE, + ACCEPT_ANY_SCHEMA) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = diff --git a/sql/catalyst/benchmarks/EnumTypeSetBenchmark-jdk11-results.txt b/sql/catalyst/benchmarks/EnumTypeSetBenchmark-jdk11-results.txt new file mode 100644 index 0000000..4c961c1 --- /dev/null +++ b/sql/catalyst/benchmarks/EnumTypeSetBenchmark-jdk11-results.txt @@ -0,0 +1,104 @@ +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 1443.3 0.7 1.0X +Use EnumSet 0 0 0 15873015.9 0.0 10997.8X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 12 13 1 81.7 12.2 1.0X +Use EnumSet 0 0 0 15873015.9 0.0 194348.9X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 19 21 2 52.2 19.2 1.0X +Use EnumSet 0 0 0 15384615.4 0.0 294667.3X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 17 18 1 58.8 17.0 1.0X +Use EnumSet 0 0 0 15151515.2 0.0 257812.2X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 18 20 2 56.1 17.8 1.0X +Use EnumSet 0 0 0 14925373.1 0.0 266144.0X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 127.0 7.9 1.0X +Use EnumSet 2 2 1 56.2 17.8 0.4X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 18 19 3 5.7 175.3 1.0X +Use EnumSet 2 2 1 49.7 20.1 8.7X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 47 48 1 2.1 471.2 1.0X +Use EnumSet 2 3 1 49.0 20.4 23.1X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 121 121 1 0.8 1208.6 1.0X +Use EnumSet 2 2 0 61.6 16.2 74.4X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 127 128 1 0.8 1267.4 1.0X +Use EnumSet 1 2 1 76.1 13.1 96.4X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 764.8 1.3 1.0X +Use EnumSet 0 0 0 31250000.0 0.0 40859.0X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +Use HashSet 39 40 2 25.4 39.4 1.0X +Use EnumSet 0 0 0 27777777.8 0.0 1095342.4X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------- +Use HashSet 68 75 6 14.7 68.0 1.0X +Use EnumSet 0 0 0 27777777.8 0.0 1890060.3X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +Use HashSet 124 125 2 8.1 123.6 1.0X +Use EnumSet 0 0 0 25641025.6 0.0 3168058.5X + +OpenJDK 64-Bit Server VM 11+28 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +Use HashSet 139 141 2 7.2 138.8 1.0X +Use EnumSet 0 0 0 28571428.6 0.0 3966840.5X diff --git a/sql/catalyst/benchmarks/EnumTypeSetBenchmark-results.txt b/sql/catalyst/benchmarks/EnumTypeSetBenchmark-results.txt new file mode 100644 index 0000000..e2bf484 --- /dev/null +++ b/sql/catalyst/benchmarks/EnumTypeSetBenchmark-results.txt @@ -0,0 +1,104 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 1316.2 0.8 1.0X +Use EnumSet 0 0 0 16393442.6 0.0 12454.9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 12 14 2 85.0 11.8 1.0X +Use EnumSet 0 0 0 15625000.0 0.0 183921.0X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 19 19 0 53.4 18.7 1.0X +Use EnumSet 0 0 0 15873015.9 0.0 297149.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 17 17 0 57.4 17.4 1.0X +Use EnumSet 0 0 0 15873015.9 0.0 276457.7X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test contains use 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 19 19 0 52.4 19.1 1.0X +Use EnumSet 0 0 0 15873015.9 0.0 302849.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 135.6 7.4 1.0X +Use EnumSet 2 2 1 64.4 15.5 0.5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 17 18 1 5.9 169.7 1.0X +Use EnumSet 2 3 1 55.6 18.0 9.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 38 38 0 2.6 378.0 1.0X +Use EnumSet 2 2 1 55.7 18.0 21.0X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 71 72 5 1.4 705.5 1.0X +Use EnumSet 2 2 1 57.2 17.5 40.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 85 85 0 1.2 847.9 1.0X +Use EnumSet 1 2 1 70.0 14.3 59.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use empty Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Use HashSet 1 1 0 704.4 1.4 1.0X +Use EnumSet 0 0 0 34482758.6 0.0 48954.9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 1 item Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +Use HashSet 35 35 0 28.4 35.2 1.0X +Use EnumSet 0 0 0 29411764.7 0.0 1034921.5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 3 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------- +Use HashSet 54 54 1 18.6 53.8 1.0X +Use EnumSet 0 0 0 30303030.3 0.0 1630614.5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 5 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +Use HashSet 105 106 1 9.5 105.2 1.0X +Use EnumSet 0 0 0 28571428.6 0.0 3007082.0X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +Test create and contains use 10 items Set: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +Use HashSet 125 126 1 8.0 124.6 1.0X +Use EnumSet 0 0 0 31250000.0 0.0 3894392.1X diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala index 5c0d0ce..a8a1b9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala @@ -76,7 +76,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { override def name: String = v1Table.identifier.quoted - override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]() + override def capabilities: util.Set[TableCapability] = + util.EnumSet.noneOf(classOf[TableCapability]) override def toString: String = s"V1Table($name)" } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index c869524..aee8e31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -156,5 +156,6 @@ private[sql] case object TestRelation2 extends LeafNode with NamedRelation { private[sql] case object TestTable2 extends Table { override def name: String = "table_name" override def schema: StructType = CreateTablePartitioningValidationSuite.schema - override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]() + override def capabilities: util.Set[TableCapability] = + util.EnumSet.noneOf(classOf[TableCapability]) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/EnumTypeSetBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/EnumTypeSetBenchmark.scala new file mode 100644 index 0000000..a23ff6e --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/EnumTypeSetBenchmark.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.connector.catalog.TableCapability._ + +/** + * Benchmark for EnumSet vs HashSet hold enumeration type + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> <spark catalyst test jar> + * 2. build/sbt "catalyst/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "catalyst/test:runMain <this class>" + * Results will be written to "benchmarks/EnumTypeSetBenchmark-results.txt". + * }}} + */ +object EnumTypeSetBenchmark extends BenchmarkBase { + + def emptyHashSet(): util.Set[TableCapability] = Collections.emptySet() + + def emptyEnumSet(): util.Set[TableCapability] = + util.EnumSet.noneOf(classOf[TableCapability]) + + def oneItemHashSet(): util.Set[TableCapability] = Set(TRUNCATE).asJava + + def oneItemEnumSet(): util.Set[TableCapability] = util.EnumSet.of(TRUNCATE) + + def threeItemsHashSet(): util.Set[TableCapability] = + Set(BATCH_READ, TRUNCATE, V1_BATCH_WRITE).asJava + + def threeItemsEnumSet(): util.Set[TableCapability] = + util.EnumSet.of(BATCH_READ, TRUNCATE, V1_BATCH_WRITE) + + def fiveItemsHashSet(): util.Set[TableCapability] = + Set(BATCH_READ, CONTINUOUS_READ, TRUNCATE, V1_BATCH_WRITE, OVERWRITE_BY_FILTER).asJava + + def fiveItemsEnumSet(): util.Set[TableCapability] = + util.EnumSet.of(BATCH_READ, CONTINUOUS_READ, TRUNCATE, V1_BATCH_WRITE, OVERWRITE_BY_FILTER) + + def allItemsHashSet(): util.Set[TableCapability] = + Set( BATCH_READ, MICRO_BATCH_READ, CONTINUOUS_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE, + OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC, ACCEPT_ANY_SCHEMA, V1_BATCH_WRITE).asJava + + def allItemsEnumSet(): util.Set[TableCapability] = + util.EnumSet.allOf(classOf[TableCapability]) + + + def testCreateSetWithEnumType( + valuesPerIteration: Int, + sizeLiteral: String, + creatHashSetFunctions: () => util.Set[TableCapability], + creatEnumSetFunctions: () => util.Set[TableCapability]): Unit = { + + val benchmark = + new Benchmark(s"Test create $sizeLiteral Set", valuesPerIteration, output = output) + + benchmark.addCase("Use HashSet") { _: Int => + for (_ <- 0L until valuesPerIteration) {creatHashSetFunctions.apply()} + } + + benchmark.addCase("Use EnumSet") { _: Int => + for (_ <- 0L until valuesPerIteration) {creatEnumSetFunctions.apply()} + } + benchmark.run() + } + + def testContainsOperation( + valuesPerIteration: Int, + sizeLiteral: String, + hashSet: util.Set[TableCapability], + enumSet: util.Set[TableCapability]): Unit = { + + val capabilities = TableCapability.values() + + val benchmark = new Benchmark( + s"Test contains use $sizeLiteral Set", + valuesPerIteration * capabilities.length, + output = output) + + benchmark.addCase("Use HashSet") { _: Int => + for (_ <- 0L until valuesPerIteration) { + capabilities.foreach(hashSet.contains) + } + } + + benchmark.addCase("Use EnumSet") { _: Int => + capabilities.foreach(enumSet.contains) + } + benchmark.run() + } + + def testCreateAndContainsOperation( + valuesPerIteration: Int, + sizeLiteral: String, + creatHashSetFunctions: () => util.Set[TableCapability], + creatEnumSetFunctions: () => util.Set[TableCapability]): Unit = { + + val capabilities = TableCapability.values() + + val benchmark = new Benchmark( + s"Test create and contains use $sizeLiteral Set", + valuesPerIteration * capabilities.length, + output = output) + + benchmark.addCase("Use HashSet") { _: Int => + for (_ <- 0L until valuesPerIteration) { + capabilities.foreach(creatHashSetFunctions.apply().contains) + } + } + + benchmark.addCase("Use EnumSet") { _: Int => + capabilities.foreach(creatEnumSetFunctions.apply().contains) + } + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + + val valuesPerIteration = 100000 + + // Test Contains + testContainsOperation(valuesPerIteration, "empty", emptyHashSet(), emptyEnumSet()) + testContainsOperation(valuesPerIteration, "1 item", oneItemHashSet(), oneItemEnumSet()) + testContainsOperation(valuesPerIteration, + "3 items", threeItemsHashSet(), threeItemsEnumSet()) + testContainsOperation(valuesPerIteration, "5 items", fiveItemsHashSet(), fiveItemsEnumSet()) + testContainsOperation(valuesPerIteration, s"${TableCapability.values().length} items", + allItemsHashSet(), allItemsEnumSet()) + + // Test Create + testCreateSetWithEnumType(valuesPerIteration, + "empty", () => emptyHashSet(), () => emptyEnumSet()) + testCreateSetWithEnumType(valuesPerIteration, + "1 item", () => oneItemHashSet(), () => oneItemEnumSet()) + testCreateSetWithEnumType(valuesPerIteration, "3 items", + () => threeItemsHashSet(), () => threeItemsEnumSet()) + testCreateSetWithEnumType(valuesPerIteration, "5 items", + () => fiveItemsHashSet(), () => fiveItemsEnumSet()) + testCreateSetWithEnumType(valuesPerIteration, s"${TableCapability.values().length} items", + () => allItemsHashSet(), () => allItemsEnumSet()) + + // Test Create and Contains + testCreateAndContainsOperation(valuesPerIteration, "empty", + () => emptyHashSet(), () => emptyEnumSet()) + testCreateAndContainsOperation(valuesPerIteration, "1 item", + () => oneItemHashSet(), () => oneItemEnumSet()) + testCreateAndContainsOperation(valuesPerIteration, "3 items", + () => threeItemsHashSet(), () => threeItemsEnumSet()) + testCreateAndContainsOperation(valuesPerIteration, "5 items", + () => fiveItemsHashSet(), () => fiveItemsEnumSet()) + testCreateAndContainsOperation(valuesPerIteration, s"${TableCapability.values().length} items", + () => allItemsHashSet(), () => allItemsEnumSet()) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index 2f3c5a3..3ebeacf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -22,7 +22,6 @@ import java.time.temporal.ChronoUnit import java.util import java.util.OptionalLong -import scala.collection.JavaConverters._ import scala.collection.mutable import org.scalatest.Assertions._ @@ -222,13 +221,13 @@ class InMemoryTable( this } - override def capabilities: util.Set[TableCapability] = Set( + override def capabilities: util.Set[TableCapability] = util.EnumSet.of( TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE, TableCapability.OVERWRITE_BY_FILTER, TableCapability.OVERWRITE_DYNAMIC, - TableCapability.TRUNCATE).asJava + TableCapability.TRUNCATE) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new InMemoryScanBuilder(schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 79e4150..1455a55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.noop import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage} @@ -44,11 +42,11 @@ private[noop] object NoopTable extends Table with SupportsWrite { override def name(): String = "noop-table" override def schema(): StructType = new StructType() override def capabilities(): util.Set[TableCapability] = { - Set( + util.EnumSet.of( TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE, TableCapability.TRUNCATE, - TableCapability.ACCEPT_ANY_SCHEMA).asJava + TableCapability.ACCEPT_ANY_SCHEMA) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 507feb2..29a2e1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -149,5 +149,5 @@ abstract class FileTable( } object FileTable { - private val CAPABILITIES = Set(BATCH_READ, BATCH_WRITE).asJava + private val CAPABILITIES = util.EnumSet.of(BATCH_READ, BATCH_WRITE) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 5e11ea6..d88ec2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -34,7 +34,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def name(): String = ident.toString override def capabilities(): util.Set[TableCapability] = { - Set(BATCH_READ, V1_BATCH_WRITE, TRUNCATE).asJava + util.EnumSet.of(BATCH_READ, V1_BATCH_WRITE, TRUNCATE) } override def newScanBuilder(options: CaseInsensitiveStringMap): JDBCScanBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index c1f5bd3..67585bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.sql._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, Write, WriteBuilder} @@ -69,7 +67,7 @@ object ConsoleTable extends Table with SupportsWrite { override def schema(): StructType = StructType(Nil) override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.STREAMING_WRITE).asJava + util.EnumSet.of(TableCapability.STREAMING_WRITE) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index ee1cb12..dd09a38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -21,7 +21,6 @@ import java.util import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.internal.Logging @@ -116,7 +115,7 @@ class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with Supp override def schema(): StructType = stream.fullSchema() override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + util.EnumSet.of(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 4b71b33..6ebf2cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming.sources import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.sql.{ForeachWriter, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -51,7 +49,7 @@ case class ForeachWriterTable[T]( override def schema(): StructType = StructType(Nil) override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.STREAMING_WRITE).asJava + util.EnumSet.of(TableCapability.STREAMING_WRITE) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index a093bf5..dd43769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming.sources import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} @@ -90,7 +88,7 @@ class RateStreamTable( override def schema(): StructType = RateStreamProvider.SCHEMA override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + util.EnumSet.of(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index d9bd916..7023a86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -21,7 +21,6 @@ import java.text.SimpleDateFormat import java.util import java.util.Locale -import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging @@ -83,7 +82,7 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest } override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + util.EnumSet.of(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 4bd2b33..7037bcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming.sources import java.util import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -49,7 +48,7 @@ class MemorySink extends Table with SupportsWrite with Logging { override def schema(): StructType = StructType(Nil) override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.STREAMING_WRITE).asJava + util.EnumSet.of(TableCapability.STREAMING_WRITE) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java index 4d147ac..5e2d0e5 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java @@ -17,8 +17,7 @@ package test.org.apache.spark.sql.connector; -import java.util.Collections; -import java.util.HashSet; +import java.util.EnumSet; import java.util.Set; import org.apache.spark.sql.connector.TestingV2Source; @@ -29,7 +28,7 @@ import org.apache.spark.sql.types.StructType; abstract class JavaSimpleBatchTable implements Table, SupportsRead { private static final Set<TableCapability> CAPABILITIES = - new HashSet<>(Collections.singletonList(TableCapability.BATCH_READ)); + EnumSet.of(TableCapability.BATCH_READ); @Override public StructType schema() { return TestingV2Source.schema(); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index 74140d7..8aac554 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -21,7 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; -import java.util.HashSet; +import java.util.EnumSet; import java.util.Iterator; import java.util.Set; @@ -221,10 +221,10 @@ public class JavaSimpleWritableDataSource implements TestingV2Source { @Override public Set<TableCapability> capabilities() { - return new HashSet<>(Arrays.asList( + return EnumSet.of( TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, - TableCapability.TRUNCATE)); + TableCapability.TRUNCATE); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index b42d48d..6eeaa55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -21,8 +21,6 @@ import java.io.File import java.util import java.util.OptionalLong -import scala.collection.JavaConverters._ - import test.org.apache.spark.sql.connector._ import org.apache.spark.SparkException @@ -466,7 +464,7 @@ abstract class SimpleBatchTable extends Table with SupportsRead { override def name(): String = this.getClass.toString - override def capabilities(): util.Set[TableCapability] = Set(BATCH_READ).asJava + override def capabilities(): util.Set[TableCapability] = util.EnumSet.of(BATCH_READ) } abstract class SimpleScanBuilder extends ScanBuilder diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index be0dae2..5156bd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.connector -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf @@ -56,7 +55,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead { } override def capabilities(): java.util.Set[TableCapability] = - Set(TableCapability.BATCH_READ, TableCapability.ACCEPT_ANY_SCHEMA).asJava + java.util.EnumSet.of(TableCapability.BATCH_READ, TableCapability.ACCEPT_ANY_SCHEMA) } class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 { @@ -79,7 +78,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite { throw new AnalysisException("Dummy file writer") override def capabilities(): java.util.Set[TableCapability] = - Set(TableCapability.BATCH_WRITE, TableCapability.ACCEPT_ANY_SCHEMA).asJava + java.util.EnumSet.of(TableCapability.BATCH_WRITE, TableCapability.ACCEPT_ANY_SCHEMA) } class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala index db71eeb..0946670 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability} @@ -78,7 +76,8 @@ object TestLocalScanTable { class TestLocalScanTable(override val name: String) extends Table with SupportsRead { override def schema(): StructType = TestLocalScanTable.schema - override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(TableCapability.BATCH_READ) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new TestLocalScanBuilder diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index bb2acec..99c322a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -139,7 +139,7 @@ class SimpleWritableDataSource extends TestingV2Source { } override def capabilities(): util.Set[TableCapability] = - Set(BATCH_READ, BATCH_WRITE, TRUNCATE).asJava + util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE) } override def getTable(options: CaseInsensitiveStringMap): Table = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala index ce94d3b..a12065e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} @@ -217,7 +215,11 @@ private case object TestRelation extends LeafNode with NamedRelation { private case class CapabilityTable(_capabilities: TableCapability*) extends Table { override def name(): String = "capability_test_table" override def schema(): StructType = TableCapabilityCheckSuite.schema - override def capabilities(): util.Set[TableCapability] = _capabilities.toSet.asJava + override def capabilities(): util.Set[TableCapability] = { + val set = util.EnumSet.noneOf(classOf[TableCapability]) + _capabilities.foreach(set.add) + set + } } private class TestStreamSourceProvider extends StreamSourceProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala index 847953e..ff1bd29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability} @@ -132,7 +130,7 @@ class TableWithV1ReadFallback(override val name: String) extends Table with Supp override def schema(): StructType = V1ReadFallbackCatalog.schema override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.BATCH_READ).asJava + util.EnumSet.of(TableCapability.BATCH_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 7effc74..9fbaf78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -331,11 +331,11 @@ class InMemoryTableWithV1Fallback( } } - override def capabilities: util.Set[TableCapability] = Set( + override def capabilities: util.Set[TableCapability] = util.EnumSet.of( TableCapability.BATCH_READ, TableCapability.V1_BATCH_WRITE, TableCapability.OVERWRITE_BY_FILTER, - TableCapability.TRUNCATE).asJava + TableCapability.TRUNCATE) @volatile private var dataMap: mutable.Map[Seq[Any], Seq[Row]] = mutable.Map.empty private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index f0b12e3..251a02d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -18,9 +18,6 @@ package org.apache.spark.sql.streaming.sources import java.util -import java.util.Collections - -import scala.collection.JavaConverters._ import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} @@ -93,7 +90,7 @@ trait FakeStreamingWriteTable extends Table with SupportsWrite { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def capabilities(): util.Set[TableCapability] = { - Set(STREAMING_WRITE).asJava + util.EnumSet.of(STREAMING_WRITE) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new FakeWriteBuilder @@ -114,7 +111,7 @@ class FakeReadMicroBatchOnly override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def capabilities(): util.Set[TableCapability] = { - Set(MICRO_BATCH_READ).asJava + util.EnumSet.of(MICRO_BATCH_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new FakeScanBuilder @@ -137,7 +134,7 @@ class FakeReadContinuousOnly override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def capabilities(): util.Set[TableCapability] = { - Set(CONTINUOUS_READ).asJava + util.EnumSet.of(CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new FakeScanBuilder @@ -154,7 +151,7 @@ class FakeReadBothModes extends DataSourceRegister with SimpleTableProvider { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def capabilities(): util.Set[TableCapability] = { - Set(MICRO_BATCH_READ, CONTINUOUS_READ).asJava + util.EnumSet.of(MICRO_BATCH_READ, CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new FakeScanBuilder @@ -170,7 +167,8 @@ class FakeReadNeitherMode extends DataSourceRegister with SimpleTableProvider { new Table { override def name(): String = "fake" override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.noneOf(classOf[TableCapability]) } } } @@ -198,7 +196,8 @@ class FakeNoWrite extends DataSourceRegister with SimpleTableProvider { new Table { override def name(): String = "fake" override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Collections.emptySet() + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.noneOf(classOf[TableCapability]) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 49e5218..62e944c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.streaming.test import java.io.File import java.util -import scala.collection.JavaConverters._ - import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, Row} @@ -412,7 +410,7 @@ class InMemoryStreamTable(override val name: String) extends Table with Supports override def schema(): StructType = stream.fullSchema() override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + util.EnumSet.of(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { @@ -423,7 +421,8 @@ class InMemoryStreamTable(override val name: String) extends Table with Supports class NonStreamV2Table(override val name: String) extends Table with SupportsRead with V2TableWithV1Fallback { override def schema(): StructType = StructType(Nil) - override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(TableCapability.BATCH_READ) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder override def v1Table: CatalogTable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala index c594a85..a720cc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.streaming.util import java.util import java.util.concurrent.CountDownLatch -import scala.collection.JavaConverters._ - import org.apache.zookeeper.KeeperException.UnimplementedException import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} @@ -97,7 +95,7 @@ class BlockOnStopSourceTable(latch: CountDownLatch) extends Table with SupportsR override def name(): String = "blockingSource" - override def capabilities(): util.Set[TableCapability] = Set(CONTINUOUS_READ).asJava + override def capabilities(): util.Set[TableCapability] = util.EnumSet.of(CONTINUOUS_READ) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new ScanBuilder { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org