This is an automated email from the ASF dual-hosted git repository. samarth pushed a commit to branch spark_druid_connector in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/spark_druid_connector by this push: new 5895ebb Set up, confs, mixins, and clients. (#11474) 5895ebb is described below commit 5895ebbeea8fa86980562d56b11873dfa974e79f Author: Julian Jaffe <46732276+julianjaffepinter...@users.noreply.github.com> AuthorDate: Fri Jul 30 11:36:45 2021 -0700 Set up, confs, mixins, and clients. (#11474) * Set up, confs, mixins, and clients. Initial module set up (e.g. pom, checkstyle, exclusions, licenses, etc.), support classes (mix-ins traits for logging and the try-with-resources pattern & configuration helpers), and an HTTP client for talking to a Druid cluster. --- .travis.yml | 2 +- LICENSE | 7 + NOTICE | 32 ++ codestyle/scalastyle_config.xml | 139 +++++++ codestyle/spotbugs-exclude.xml | 4 + hooks/pre-push.sh | 2 +- pom.xml | 7 +- spark/pom.xml | 444 +++++++++++++++++++++ .../apache/druid/spark/clients/DruidClient.scala | 208 ++++++++++ .../druid/spark/clients/HttpClientHolder.scala | 51 +++ .../druid/spark/configuration/Configuration.scala | 209 ++++++++++ .../configuration/DruidConfigurationKeys.scala | 35 ++ .../org/apache/druid/spark/mixins/Logging.scala | 116 ++++++ .../druid/spark/mixins/TryWithResources.scala | 329 +++++++++++++++ .../scala/org/apache/druid/spark/package.scala | 27 ++ .../spark/configuration/ConfigurationSuite.scala | 76 ++++ 16 files changed, 1685 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4211f18..4f8e68a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -74,7 +74,7 @@ jobs: script: ${MVN} animal-sniffer:check --fail-at-end - name: "checkstyle" - script: ${MVN} checkstyle:checkstyle --fail-at-end + script: ${MVN} checkstyle:checkstyle --fail-at-end -pl '!spark' && ${MVN} scalastyle:check --fail-at-end -pl 'spark' - name: "enforcer checks" script: ${MVN} enforcer:enforce --fail-at-end diff --git a/LICENSE b/LICENSE index aa65ab9..7318c8d 100644 --- a/LICENSE +++ b/LICENSE @@ -279,6 +279,13 @@ SOURCE/JAVA-CORE This product contains lpad and rpad methods adapted from Apache Flink. * core/src/main/java/org/apache/druid/java/util/common/StringUtils.java + This product contains Scala logging and serializable Hadoop configuration utilities adapted from Apache Spark. + * spark/src/main/scala/org/apache/druid/spark/mixins/Logging + + This product contains a Druid client wrapper adapted from Imply Data's druid-hadoop-input-format. + * spark/src/main/scala/org/apache/druid/spark/clients/DruidClient + * spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder + MIT License ================================ diff --git a/NOTICE b/NOTICE index 7761a54..64e3a2b 100644 --- a/NOTICE +++ b/NOTICE @@ -102,3 +102,35 @@ ConciseSet to use IntBuffers. + +================= Apache Spark 2.4.7 ================= + +Apache Spark +Copyright 2014 and onwards The Apache Software Foundation. + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + + +Export Control Notice +--------------------- + +This distribution includes cryptographic software. The country in which you currently reside may have +restrictions on the import, possession, use, and/or re-export to another country, of encryption software. +BEFORE using any encryption software, please check your country's laws, regulations and policies concerning +the import, possession, or use, and re-export of encryption software, to see if this is permitted. See +<http://www.wassenaar.org/> for more information. + +The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this +software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software +using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache +Software Foundation distribution makes it eligible for export under the License Exception ENC Technology +Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for +both object code and source code. + +The following provides more details on the included cryptographic software: + +This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to +support authentication, and encryption and decryption of data sent across the network between +services. + diff --git a/codestyle/scalastyle_config.xml b/codestyle/scalastyle_config.xml new file mode 100644 index 0000000..9e2ed4d --- /dev/null +++ b/codestyle/scalastyle_config.xml @@ -0,0 +1,139 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<scalastyle commentFilter="enabled"> + <name>Apache Druid Scalastyle configuration</name> + <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true"> + <parameters> + <parameter name="maxFileLength"><![CDATA[800]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> + <parameters> + <parameter name="header"><![CDATA[/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/> + <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/> + <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/> + <check level="warning" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"/> + <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLineLength"><![CDATA[120]]></parameter> + <parameter name="tabSize"><![CDATA[4]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/> + <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> + <parameters> + <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> + <parameters> + <parameter name="maxParameters"><![CDATA[8]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> + <parameters> + <parameter name="maxTypes"><![CDATA[30]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> + <parameters> + <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> + <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLength"><![CDATA[50]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> + <parameters> + <parameter name="maxMethods"><![CDATA[30]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.RedundantIfChecker" enabled="true"></check> + <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[println]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.EmptyClassChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.ImportGroupingChecker" enabled="true"></check> +</scalastyle> \ No newline at end of file diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index cb6cb9c..f70c128 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -44,6 +44,10 @@ <Class name="org.apache.druid.server.AsyncQueryForwardingServlet" /> </And> </Match> + <!-- Spot Bugs doesn't work for Scala --> + <Match> + <Package name="~org\.apache\.druid\.spark.*"/> + </Match> <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/> <Bug pattern="BC_UNCONFIRMED_CAST"/> diff --git a/hooks/pre-push.sh b/hooks/pre-push.sh index a0928db..31b40ba 100755 --- a/hooks/pre-push.sh +++ b/hooks/pre-push.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -mvn checkstyle:checkstyle --fail-at-end +mvn checkstyle:checkstyle --fail-at-end -pl '!spark' && mvn scalastyle:check --fail-at-end -pl spark diff --git a/pom.xml b/pom.xml index eb066a4..9786eac 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,7 @@ <protobuf.version>3.11.0</protobuf.version> <resilience4j.version>1.3.1</resilience4j.version> <slf4j.version>1.7.12</slf4j.version> + <surefire.version>2.22.2</surefire.version> <!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java --> <hadoop.compile.version>2.8.5</hadoop.compile.version> <mockito.version>3.8.0</mockito.version> @@ -142,6 +143,8 @@ <!-- Core cloud functionality --> <module>cloud/aws-common</module> <module>cloud/gcp-common</module> + <!-- Spark Connectors --> + <module>spark</module> <!-- Core extensions --> <module>extensions-core/kubernetes-extensions</module> <module>extensions-core/avro-extensions</module> @@ -1517,7 +1520,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.22.2</version> + <version>${surefire.version}</version> <configuration> <!-- locale settings must be set on the command line before startup --> <!-- set default options --> @@ -1777,6 +1780,8 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> + <!-- Explicitly setting version here so that intelliJ uses the right schema for inspections --> + <version>${surefire.version}</version> <executions> <execution> <phase>test</phase> diff --git a/spark/pom.xml b/spark/pom.xml new file mode 100644 index 0000000..6dcae79 --- /dev/null +++ b/spark/pom.xml @@ -0,0 +1,444 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>druid-spark</artifactId> + <name>druid-spark</name> + <description>Spark connectors for reading data from and writing data to Druid clusters</description> + + <parent> + <groupId>org.apache.druid</groupId> + <artifactId>druid</artifactId> + <version>0.22.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <properties> + <scala.version>${scala.major.version}.12</scala.version> + <scala.major.version>2.12</scala.major.version> + <spark.version>2.4.8</spark.version> + <!-- These two properties allow -Dcheckstyle.skip to suppress scalastyle checks as well --> + <checkstyle.skip>false</checkstyle.skip> + <scalastyle.skip>${checkstyle.skip}</scalastyle.skip> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>4.0.2</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-core</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </exclusion> + <exclusion> + <groupId>com.opencsv</groupId> + <artifactId>opencsv</artifactId> + </exclusion> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>airline</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>io.timeandspace</groupId> + <artifactId>cron-scheduler</artifactId> + </exclusion> + <exclusion> + <groupId>net.thisptr</groupId> + <artifactId>jackson-jq</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.druid</groupId> + <artifactId>druid-console</artifactId> + </exclusion> + <exclusion> + <groupId>org.gridkit.lab</groupId> + <artifactId>jvm-attach-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.hyperic</groupId> + <artifactId>sigar</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.mozilla</groupId> + <artifactId>rhino</artifactId> + </exclusion> + <exclusion> + <groupId>org.skife.config</groupId> + <artifactId>config-magic</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.ibm.icu</groupId> + <artifactId>icu4j</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.maven</groupId> + <artifactId>maven-artifact</artifactId> + </exclusion> + <exclusion> + <groupId>org.mozilla</groupId> + <artifactId>rhino</artifactId> + </exclusion> + <exclusion> + <groupId>org.skife.config</groupId> + <artifactId>config-magic</artifactId> + </exclusion> + </exclusions> + </dependency> + + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-dbcp2</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Spark --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.major.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.major.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-unsafe_${scala.major.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.major.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Tests --> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-core</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.hyperic</groupId> + <artifactId>sigar</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.major.version}</artifactId> + <version>3.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalactic</groupId> + <artifactId>scalactic_${scala.major.version}</artifactId> + <version>3.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- scala build --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + <execution> + <id>attach-scaladocs</id> + <phase>verify</phase> + <goals> + <goal>doc-jar</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path</javacArg> + </javacArgs> + </configuration> + </plugin> + <!-- disable surefire --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + <!-- enable scalatest --> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <parallel>false</parallel> + <argLine>-Dderby.stream.error.field=org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM\ + -Dlog4j.configuration=${project.basedir}/test/resources/log4j2.xml + </argLine> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>1.0.0</version> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../codestyle/scalastyle_config.xml</configLocation> + </configuration> + <executions> + <execution> + <id>validate</id> + <phase>validate</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala new file mode 100644 index 0000000..577f87e --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.clients + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.google.common.net.HostAndPort +import org.apache.druid.java.util.common.{IAE, ISE, Intervals, JodaUtils, StringUtils} +import org.apache.druid.java.util.http.client.response.{StringFullResponseHandler, + StringFullResponseHolder} +import org.apache.druid.java.util.http.client.{HttpClient, Request} +import org.apache.druid.query.Druids +import org.apache.druid.query.metadata.metadata.{ColumnAnalysis, SegmentAnalysis, + SegmentMetadataQuery} +import org.apache.druid.spark.MAPPER +import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys} +import org.apache.druid.spark.mixins.Logging +import org.jboss.netty.handler.codec.http.{HttpMethod, HttpResponseStatus} +import org.joda.time.{Duration, Interval} + +import java.net.URL +import java.util.{List => JList} +import javax.ws.rs.core.MediaType +import scala.annotation.tailrec +import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter, + mapAsScalaMapConverter, seqAsJavaListConverter} + +/** + * Going with SegmentMetadataQueries despite the significant overhead because there's no other way + * to get accurate information about Druid columns. + * + * This is pulled pretty directly from the druid-hadoop-input-format project on GitHub. There is + * likely substantial room for improvement. + */ +class DruidClient( + httpClient: HttpClient, + hostAndPort: HostAndPort, + numRetries: Int, + retryWaitSeconds: Int, + timeoutWaitMilliseconds: Int + ) extends Logging { + private val druidBaseQueryURL: HostAndPort => String = + (hostAndPort: HostAndPort) => s"http://$hostAndPort/druid/v2/" + + private val DefaultSegmentMetadataInterval = List[Interval](Intervals.utc( + JodaUtils.MIN_INSTANT, + JodaUtils.MAX_INSTANT + )) + + /** + * The SQL system catalog tables are incorrect for multivalue columns and don't have accurate + * type info, so we need to fall back to segmentMetadataQueries. Given a DATASOURCE and a range + * of INTERVALS to query over, return a map from column name to a tuple of + * (columnType, hasMultipleValues). Note that this is a very expensive operation over large + * numbers of segments. If possible, this method should only be called with the most granular + * starting and ending intervals instead of over a larger interval. + * + * @param dataSource The Druid dataSource to fetch the schema for. + * @param intervals The intervals to return the schema for, or None to query all segments. + * @return A map from column name to data type and whether or not the column is multi-value + * for the schema of DATASOURCE. + */ + def getSchema(dataSource: String, intervals: Option[List[Interval]]): Map[String, (String, Boolean)] = { + val queryInterval = intervals.getOrElse(DefaultSegmentMetadataInterval) + val body = Druids.newSegmentMetadataQueryBuilder() + .dataSource(dataSource) + .intervals(queryInterval.asJava) + .analysisTypes(SegmentMetadataQuery.AnalysisType.SIZE) + .merge(true) + .context(Map[String, AnyRef]( + "timeout" -> Int.box(timeoutWaitMilliseconds) + ).asJava) + .build() + val response = sendRequestWithRetry( + druidBaseQueryURL(hostAndPort), numRetries, Option(MAPPER.writeValueAsBytes(body)) + ) + val segments = + MAPPER.readValue[JList[SegmentAnalysis]]( + response.getContent, new TypeReference[JList[SegmentAnalysis]]() {}) + if (segments.size() == 0) { + throw new IAE( + s"No segments found for intervals [${intervals.mkString(",")}] on $dataSource" + ) + } + // Since we're setting merge to true, there should only be one item in the list + if (segments.size() > 1) { + throw new ISE("Merged segment metadata response had more than one row!") + } + log.debug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", ", ", "]")) + /* + * If a dimension has multiple types within the spanned interval, the resulting column + * analysis will have the type "STRING" and be an error message. We abuse that here to infer + * a string type for the dimension and widen the type for the resulting DataFrame. + */ + val columns = segments.asScala.head.getColumns.asScala.toMap + columns.foreach{ case(key, column) => + if (column.isError) { + log.warn(s"Multiple column types found for dimension $key in interval" + + s" ${queryInterval.mkString("[", ", ", "]")}! Falling back to STRING type") + } + } + columns.map{ case (name: String, col: ColumnAnalysis) => + name -> (col.getType, col.isHasMultipleValues) + } + } + + /* + * Marking this method as tail recursive because it is for now. If a finally block, + * special error handling, or more involved set up and tear down code is added, this + * method may no longer be tail recursive and so compilation will fail. Because the + * number of retries is user-configurable and will likely be relatively small, + * latency in communication with a Druid cluster for Segment Metadata will be dominated + * by the query time, and Scala will optimize tail recursive calls regardless of annotation, + * future developers shouldn't be concerned if they need to remove this annotation. + */ + @tailrec + private def sendRequestWithRetry( + url: String, + retryCount: Int, + content: Option[Array[Byte]] = None + ): StringFullResponseHolder = { + try { + sendRequest(url, content) + } catch { + case e: Exception => + if (retryCount > 0) { + logInfo(s"Got exception: ${e.getMessage}, retrying ...") + Thread.sleep(retryWaitSeconds * 1000) + sendRequestWithRetry(url, retryCount - 1, content) + } else { + throw e + } + } + } + + private def sendRequest(url: String, content: Option[Array[Byte]]): StringFullResponseHolder = { + try { + val request = buildRequest(url, content) + var response = httpClient.go( + request, + new StringFullResponseHandler(StringUtils.UTF8_CHARSET), + Duration.millis(timeoutWaitMilliseconds) + ).get + if (response.getStatus == HttpResponseStatus.TEMPORARY_REDIRECT) { + val newUrl = response.getResponse.headers().get("Location") + logInfo(s"Got a redirect, new location: $newUrl") + response = httpClient.go( + buildRequest(newUrl, content), new StringFullResponseHandler(StringUtils.UTF8_CHARSET) + ).get + } + if (!(response.getStatus == HttpResponseStatus.OK)) { + throw new ISE( + s"Error getting response for %s, status[%s] content[%s]", + url, + response.getStatus, + response.getContent + ) + } + response + } catch { + case e: Exception => + throw e + } + } + + def buildRequest(url: String, content: Option[Array[Byte]]): Request = { + content.map( + new Request(HttpMethod.POST, new URL(url)) + .setHeader("Content-Type", MediaType.APPLICATION_JSON) + .setContent(_) + ).getOrElse( + new Request(HttpMethod.GET, new URL(url)) + ) + } +} + +object DruidClient { + // TODO: Add support for Kerberized etc. clients + def apply(conf: Configuration): DruidClient = { + val brokerConf = conf.dive(DruidConfigurationKeys.brokerPrefix) + new DruidClient( + HttpClientHolder.create.get, + HostAndPort.fromParts( + brokerConf.get(DruidConfigurationKeys.brokerHostDefaultKey), + brokerConf.getInt(DruidConfigurationKeys.brokerPortDefaultKey)), + brokerConf.getInt(DruidConfigurationKeys.numRetriesDefaultKey), + brokerConf.getInt(DruidConfigurationKeys.retryWaitSecondsDefaultKey), + brokerConf.getInt(DruidConfigurationKeys.timeoutMillisecondsDefaultKey) + ) + } +} + diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala new file mode 100644 index 0000000..b13d2c8 --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.clients + +import com.google.common.base.Throwables +import org.apache.druid.java.util.common.lifecycle.Lifecycle +import org.apache.druid.java.util.http.client.{HttpClient, HttpClientConfig, HttpClientInit} + +import java.io.{Closeable, IOException} + +object HttpClientHolder { + def create: HttpClientHolder = { + val lifecycle = new Lifecycle + val httpClient = HttpClientInit.createClient(HttpClientConfig.builder.build, lifecycle) + try { + lifecycle.start() + } catch { + case e: Exception => + throw Throwables.propagate(e) + } + new HttpClientHolder(lifecycle, httpClient) + } +} + +class HttpClientHolder(val lifecycle: Lifecycle, val client: HttpClient) extends Closeable { + def get: HttpClient = { + client + } + + @throws[IOException] + override def close(): Unit = { + lifecycle.stop() + } +} diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala new file mode 100644 index 0000000..71f31de --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.configuration + +import org.apache.druid.java.util.common.StringUtils +import org.apache.spark.sql.sources.v2.DataSourceOptions + +import scala.collection.JavaConverters.mapAsScalaMapConverter + +/** + * A simple wrapper around a properties map that can also "dive" into a sub-Configuration if keys are stored using + * dotted separators. Because this class is designed to make working with Spark DataSourceOptions easier and DSOs are + * case-insensitive, this class is also case-insensitive. + * + * @param properties A Map of String property names to String property values to back this Configuration. + */ +class Configuration(properties: Map[String, String]) extends Serializable { + def getAs[T](key: String): T = { + properties(StringUtils.toLowerCase(key)).asInstanceOf[T] + } + + def get(key: String): Option[String] = { + properties.get(StringUtils.toLowerCase(key)) + } + + def get(keyWithDefault: (String, String)): String = { + this.get(StringUtils.toLowerCase(keyWithDefault._1)).getOrElse(keyWithDefault._2) + } + + def getString(key: String): String = { + properties.getOrElse(StringUtils.toLowerCase(key), "") + } + + def getInt(key: String, default: Int): Int = { + properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toInt) + } + + def getInt(keyWithDefault: (String, Int)): Int = { + this.getInt(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2) + } + + def getLong(key: String, default: Long): Long = { + properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toLong) + } + + def getLong(keyWithDefault: (String, Long)): Long = { + this.getLong(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2) + } + + def getBoolean(key: String, default: Boolean): Boolean = { + properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toBoolean) + } + + def getBoolean(keyWithDefault: (String, Boolean)): Boolean = { + this.getBoolean(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2) + } + + def apply(key: String): String = { + this.get(key) match { + case Some(v) => v + case None => throw new NoSuchElementException(s"Key $key not found!") + } + } + + def isPresent(key: String): Boolean = { + properties.isDefinedAt(StringUtils.toLowerCase(key)) + } + + def isPresent(paths: String*): Boolean = { + properties.isDefinedAt(StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))) + } + + /** + * Given a prefix PREFIX, return a Configuration object containing every key in this + * configuration that starts with PREFIX. Keys in the resulting Configuration will have PREFIX + * removed from their start. + * + * For example, if a Configuration contains the key `druid.broker.host` and `dive("druid")` is + * called on it, the resulting Configuration will contain the key `broker.host` with the same + * value as `druid.broker.host` had in the original Configuration. + * + * @param prefix The namespace to "dive" into. + * @return A Configuration containing the keys in this Configuration that start with PREFIX, + * stripped of their leading PREFIX. + */ + def dive(prefix: String): Configuration = { + new Configuration(properties + .filterKeys(_.startsWith(s"$prefix${Configuration.SEPARATOR}")) + .map{case (k, v) => k.substring(prefix.length + 1) -> v}) + } + + /** + * Given a number of prefixes, return a Configuration object containing every key in this + * configuration that starts with `prefixes.mkString(Configuration.SEPARATOR)`. Keys in the + * resulting Configuration will have the concatenation of PREFIXES removed from their start. + * + * Note that this is the equivalent of chaining `dive` calls, not chaining `merge` calls. + * + * @param prefixes The namespaces, in order, to "dive" into. + * @return A Configuration containing the keys that start with every prefix in PREFIXES joined + * by periods, stripped of the leading prefixes matching the prefixes in PREFIXES. + */ + def dive(prefixes: String*): Configuration = { + prefixes.foldLeft(this){case (conf, prefix) => conf.dive(prefix)} + } + + /** + * Combine this configuration with another Configuration. If keys collide between these + * configurations, the corresponding values in OTHER will be selected. + * + * @param other A Configuration to merge with this Configuration. + * @return A Configuration containing the union of keys between this Configuration and OTHER. + * If keys collide between the two Configurations, the values in OTHER will be kept. + */ + def merge(other: Configuration): Configuration = { + new Configuration(this.properties ++ other.toMap) + } + + /** + * Combine this configuration with another Configuration, moving the other Configuration to the provided name space. + * If keys collide between this configuration and the newly-namespaced OTHER, the corresponding values in OTHER will + * be selected. + * + * @param namespace The name space to merge OTHER under. + * @param other The Configuration to merge with this Configuration. + * @return A new Configuration object containing all the keys in this Configuration, plus the keys in OTHER + * namespaced under NAMESPACE. + */ + def merge(namespace: String, other: Configuration): Configuration = { + this.merge(Configuration(namespace, other.toMap)) + } + + /** + * Add the properties specified in PROPERTIES to this Configuration's properties, moving the new properties to the + * provided name space. If this Configuration already contains keys under the provided name space and those keys + * collide with the properties specified in PROPERTIES, the corresponding values in PROPERTIES will be selected. + * + * @param namespace The name space to merge the properties specified in PROPERTIES under. + * @param properties The map of properties to values to combine with the properties from this Configuration. + * @return A new Configuration object containing all the keys in this Configuration, plus the properties in + * PROPERTIES namespaced under NAMESPACE. + */ + def merge(namespace: String, properties: Map[String, String]): Configuration = { + this.merge(Configuration(namespace, properties)) + } + + override def equals(obj: Any): Boolean = { + obj.isInstanceOf[Configuration] && this.toMap == obj.asInstanceOf[Configuration].toMap + } + + override def hashCode(): Int = { + this.properties.hashCode() + } + + def toMap: Map[String, String] = { + this.properties + } + + override def toString: String = { + this.toMap.mkString("Configuration{", "; ", "}") + } +} + +object Configuration { + def apply(properties: Map[String, String]): Configuration = { + new Configuration(properties.map{case (k, v) => StringUtils.toLowerCase(k) -> v}) + } + + def apply(namespace: String, properties: Map[String, String]): Configuration = { + new Configuration(properties.map{case(k, v) => StringUtils.toLowerCase(s"$namespace$SEPARATOR$k") -> v}) + } + + def apply(dso: DataSourceOptions): Configuration = { + new Configuration(dso.asMap().asScala.toMap) + } + + def fromKeyValue(key: String, value: String): Configuration = { + new Configuration(Map[String, String](StringUtils.toLowerCase(key) -> value)) + } + + /** + * Get the key corresponding to each element of PATHS interpreted as a namespace or property. + * + * @param paths The parent namespaces and property as individual strings to convert into a single configuration key. + * @return The path to a property through its parent namespaces as a single configuration key. + */ + def toKey(paths: String*): String = { + StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR)) + } + + private val SEPARATOR = "." +} diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala new file mode 100644 index 0000000..cfa1d74 --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.configuration + +object DruidConfigurationKeys { + // Druid Client Configs + val brokerPrefix: String = "broker" + val brokerHostKey: String = "host" + val brokerPortKey: String = "port" + val numRetriesKey: String = "numRetries" + val retryWaitSecondsKey: String = "retryWaitSeconds" + val timeoutMillisecondsKey: String = "timeoutMilliseconds" + private[spark] val brokerHostDefaultKey: (String, String) = (brokerHostKey, "localhost") + private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 8082) + private[spark] val numRetriesDefaultKey: (String, Int) = (numRetriesKey, 5) + private[spark] val retryWaitSecondsDefaultKey: (String, Int) = (retryWaitSecondsKey, 5) + private[spark] val timeoutMillisecondsDefaultKey: (String, Int) = (timeoutMillisecondsKey, 300000) +} diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala new file mode 100644 index 0000000..055c22b --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.mixins + +import org.apache.druid.java.util.common.logger.Logger + +/** + * Simplified version of org.apache.spark.internal.Logging. Uses + * org.apache.druid.java.util.common.logger.Logger instead of slf4j's Logger directly. + */ +trait Logging { + + @transient private var logger: Logger = _ + + private lazy val logName = this.getClass.getName.stripSuffix("$") + + /** + * Return the configured underlying logger + * + * @return the configured underlying logger + */ + protected def log: Logger = { + if (logger == null) { + logger= new Logger(logName) + } + logger + } + + /** + * Log a message at the TRACE level. + * + * @param msg the message string to be logged + */ + protected def logTrace(msg: => String): Unit = { + if (log.isTraceEnabled) { + log.trace(msg) + } + } + + /** + * Log a message at the DEBUG level. + * + * @param msg the message string to be logged + */ + protected def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + /** + * Log a message at the INFO level. + * + * @param msg the message string to be logged + */ + protected def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + /** + * Log a message at the WARN level. + * + * @param msg the message string to be logged + */ + protected def logWarn(msg: => String): Unit = { + log.warn(msg) + } + + /** + * Log a message with an exception at the WARN level. + * + * @param msg the message string to be logged + * @param exception the exception to log in addition to the message + */ + protected def logWarn(msg: => String, exception: Throwable): Unit = { + log.warn(exception, msg) + } + + /** + * Log a message at the ERROR level. + * + * @param msg the message string to be logged + */ + protected def logError(msg: => String): Unit = { + log.error(msg) + } + + /** + * Log a message with an exception at the ERROR level. + * + * @param msg the message string to be logged + * @param exception the exception to log in addition to the message + */ + protected def logError(msg: => String, exception: Throwable): Unit = { + log.error(exception, msg) + } +} diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala new file mode 100644 index 0000000..6b0ff24 --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.mixins + +import scala.util.control.{ControlThrowable, NonFatal} + +/** + * Utility trait to ape the try-with-resources construct in Java. This is quick and dirty and has its flaws. If a more + * robust approach is needed, the better-files approach (https://github.com/pathikrit/better-files#lightweight-arm) + * should be considered. + * + * Scala doesn't support varargs of generic type, so using overloading to support managing multiple resources with a + * sequence. Ideally Scala would support macro programming to generate tryWithResources functions for + * tuples of [_ <: AutoCloseable] so users could un-tuple multiple resources as named variables without needing to + * manually generate separate methods for each length up to 22, but since it doesn't I've created methods for up to + * five resources at once. These methods can be nested or the method taking a Sequence of AutoCloseables can be used + * for arbitrary resources, with the caveat that named unpacking is not possible over Sequences. If there was a way to + * enforce type bounds on the elements of subclasses of Produce we could hack up a type that unioned AutoCloseable and + * Products whose elements are all subtypes of AutoCloseable, but since there isn't this is a quick fix. Note that + * shared code for the tuple methods is not refactored to a single private method because the common supertype of + * tuples is Product, with no type information. + */ +trait TryWithResources { + /** + * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so: + * val resource = new ResourceImplementingAutoCloseable() + * tryWithResources(resource){r => + * r.doSomething() + * } + * + * or, if desired, + * + * tryWithResources(new ResourceImplementingAutoCloseable()){ resouce => + * resource.doSomething() + * } + * + * @param resource The AutoCloseable resource to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCE. + */ + def tryWithResources[T <: AutoCloseable, V](resource: T)(func: T => V): V = { + try { + func(resource) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + try { + resource.close() + } catch { + case NonFatal(e2) => + // e2 isn't fatal, let the original exception e take precedence + e.addSuppressed(e2) + case e2: Throwable => + if (NonFatal(e)) { + // e2 is fatal but e isn't, suppress e + e2.addSuppressed(e) + throw e2 + } + // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing + e.addSuppressed(e2) + } + throw e + } finally { + resource.close() + } + } + + /** + * A helper function to duplicate Java's try-with-resources construction. Unfortunately Scala doesn't support varargs + * for generic types and I don't feel like writing 22 separate functions, so callers will have to keep track of the + * exact ordering of elements in the resource sequence or nest the provided tuple methods. + * + * To use, mix in this trait and then call like so: + * val fileResource = new ResourceImplementingAutoCloseable() + * val writerResource = new OtherAutoCloseable() + * tryWithResources(Seq(fileResource, writerResource)){resources => + * val file = resources(0) + * val writer = resources(1) + * writer.write(file, data) + * } + * + * @param resources A list of AutoCloseable resources to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCES. + */ + def tryWithResources[T <: AutoCloseable, V](resources: Seq[T])(func: Seq[T] => V): V = { + try { + func(resources) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + throw closeAllResourcesMergingExceptions(resources, e) + } finally { + closeAllResourcesFinally(resources) + } + } + + /** + * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so: + * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable()) + * tryWithResources(resources){ + * case (first, second) => + * first.doSomething() + * second.doSomething() + * } + * + * or, if desired, + * + * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable){ + * case (first, second) => + * first.doSomething() + * second.doSomething + * } + * + * @param resources A tuple of AutoCloseable resources to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCES. + */ + def tryWithResources[T <: AutoCloseable, V](resources: (T, T))(func: ((T, T)) => V): V = { + val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable]) + try { + func(resources) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + throw closeAllResourcesMergingExceptions(closeableResources, e) + } finally { + closeAllResourcesFinally(closeableResources) + } + } + + /** + * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so: + * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...) + * tryWithResources(resources){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * or, if desired, + * + * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * @param resources A tuple of AutoCloseable resources to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCES. + */ + def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T))(func: ((T, T, T)) => V): V = { + val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable]) + try { + func(resources) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + throw closeAllResourcesMergingExceptions(closeableResources, e) + } finally { + closeAllResourcesFinally(closeableResources) + } + } + + /** + * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so: + * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...) + * tryWithResources(resources){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * or, if desired, + * + * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * @param resources A tuple of AutoCloseable resources to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCES. + */ + def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T))(func: ((T, T, T, T)) => V): V = { + val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable]) + try { + func(resources) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + throw closeAllResourcesMergingExceptions(closeableResources, e) + } finally { + closeAllResourcesFinally(closeableResources) + } + } + + /** + * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so: + * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...) + * tryWithResources(resources){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * or, if desired, + * + * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){ + * case (first, second, ...) => + * first.doSomething() + * second.doSomething() + * ... + * } + * + * @param resources A tuple of AutoCloseable resources to use in FUNC. + * @param func The function block to execute (think of this as the try block). + * @tparam T Any subtype of AutoCloseable. + * @tparam V The result type of FUNC. + * @return The result of executing FUNC with RESOURCES. + */ + def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T, T))(func: ((T, T, T, T, T)) => V): V = { + val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable]) + try { + func(resources) + } catch { + // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions + case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) => + throw closeAllResourcesMergingExceptions(closeableResources, e) + } finally { + closeAllResourcesFinally(closeableResources) + } + } + + /** + * Given a list of closeables RESOURCES and an throwable EXCEPTION, close all supplied resources, merging any + * additional errors that occur. The main point here is to ensure every resource in resources is closed; I'm not sure + * how useful the "merge" logic actually is when pulled out to a list of closeables instead of just a single one. + * + * @param resources The list of resources to close. + * @param exception The exception to merge additional throwables into. + * @return The final throwable resulting from "merging" EXCEPTION with any additional throwables raised while closing + * the resources in RESOURCES. + */ + private def closeAllResourcesMergingExceptions(resources: Seq[AutoCloseable], exception: Throwable): Throwable = { + resources.foldRight(exception)((resource, ex) => + try { + resource.close() + ex + } catch { + case NonFatal(e2) => + // e2 isn't fatal, let the original exception e take precedence + ex.addSuppressed(e2) + ex + case e2: Throwable => + if (NonFatal(ex)) { + // e2 is fatal but e isn't, suppress e + e2.addSuppressed(ex) + e2 + } else { + // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing + ex.addSuppressed(e2) + ex + } + } + ) + } + + /** + * Given RESOURCES, attempts to close all of them even in the face of errors. Arbitrarily, the last exception + * encountered is thrown, with earlier exceptions suppressed. + * + * @param resources The list of resources to close. + */ + private def closeAllResourcesFinally(resources: Seq[AutoCloseable]): Unit = { + // Using foldRight to iterate over resources to ensure we don't short circuit and leave resources unclosed if an + // earlier resource throws an exception on .close(). + val exceptionOption = resources + .foldRight(None.asInstanceOf[Option[Throwable]])((resource, exOpt) => + try { + resource.close() + exOpt + } catch { + case e: Throwable => + Some(exOpt.fold(e) { ex => + ex.addSuppressed(e) + ex + }) + } + ) + if (exceptionOption.isDefined) { + throw exceptionOption.get + } + } +} diff --git a/spark/src/main/scala/org/apache/druid/spark/package.scala b/spark/src/main/scala/org/apache/druid/spark/package.scala new file mode 100644 index 0000000..22b440a --- /dev/null +++ b/spark/src/main/scala/org/apache/druid/spark/package.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.druid.jackson.DefaultObjectMapper + +package object spark { + private[spark] val MAPPER: ObjectMapper = new DefaultObjectMapper() +} diff --git a/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala new file mode 100644 index 0000000..0bdb9cd --- /dev/null +++ b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.configuration + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class ConfigurationSuite extends AnyFunSuite with Matchers { + private val testConf: Configuration = Configuration(Map[String, String]( + Configuration.toKey("test", "sub-conf", "property") -> "testProp", + Configuration.toKey("test", "property") -> "quizProp", + "property" -> "examProperty" + )) + + test("isPresent should correctly handle single keys as well as paths to properties") { + testConf.isPresent("property") should be(true) + testConf.isPresent("test", "sub-conf", "property") should be(true) + testConf.isPresent("test") should be(false) + testConf.isPresent("exam", "sub-conf", "property") should be(false) + testConf.isPresent("test.property") should be(true) + testConf.isPresent("test.property.property") should be(false) + } + + test("dive should correctly return sub-configurations") { + val subConf = testConf.dive("test") + subConf.getString("property") should equal("quizProp") + subConf.dive("sub-conf") should equal(Configuration.fromKeyValue("property", "testProp")) + subConf.dive("sub-conf") should equal(testConf.dive("test", "sub-conf")) + } + + test("dive should return empty maps when called on uncontained namespaces") { + testConf.dive("exam").toMap.isEmpty should be(true) + } + + test("Configurations should be case-insensitive") { + testConf.getString("pRoPeRtY") should equal(testConf.getString("property")) + } + + test("merge should correctly combine Configurations") { + val otherConf = Configuration(Map[String, String]( + Configuration.toKey("other", "conf", "key") -> "value", + "property" -> "new property" + )) + val mergedConf = testConf.merge(otherConf) + mergedConf.getString(Configuration.toKey("other", "conf", "key")) should equal("value") + mergedConf.getString("property") should equal("new property") + } + + test("merge should correctly namespace merged Configurations") { + val otherConf = Configuration(Map[String, String]( + "key" -> "1", + "property" -> "new property" + )) + val mergedConf = testConf.merge("test", otherConf) + mergedConf.getString(Configuration.toKey("test", "property")) should equal("new property") + mergedConf.getInt(Configuration.toKey("test", "key"), -1) should equal(1) + mergedConf("property") should equal("examProperty") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org