This is an automated email from the ASF dual-hosted git repository.

samarth pushed a commit to branch spark_druid_connector
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/spark_druid_connector by this 
push:
     new 5895ebb  Set up, confs, mixins, and clients. (#11474)
5895ebb is described below

commit 5895ebbeea8fa86980562d56b11873dfa974e79f
Author: Julian Jaffe <46732276+julianjaffepinter...@users.noreply.github.com>
AuthorDate: Fri Jul 30 11:36:45 2021 -0700

    Set up, confs, mixins, and clients. (#11474)
    
    * Set up, confs, mixins, and clients.
    
    Initial module set up (e.g. pom, checkstyle, exclusions, licenses, etc.), 
support classes (mix-ins traits for logging and the try-with-resources pattern 
& configuration helpers), and an HTTP client for talking to a Druid cluster.
---
 .travis.yml                                        |   2 +-
 LICENSE                                            |   7 +
 NOTICE                                             |  32 ++
 codestyle/scalastyle_config.xml                    | 139 +++++++
 codestyle/spotbugs-exclude.xml                     |   4 +
 hooks/pre-push.sh                                  |   2 +-
 pom.xml                                            |   7 +-
 spark/pom.xml                                      | 444 +++++++++++++++++++++
 .../apache/druid/spark/clients/DruidClient.scala   | 208 ++++++++++
 .../druid/spark/clients/HttpClientHolder.scala     |  51 +++
 .../druid/spark/configuration/Configuration.scala  | 209 ++++++++++
 .../configuration/DruidConfigurationKeys.scala     |  35 ++
 .../org/apache/druid/spark/mixins/Logging.scala    | 116 ++++++
 .../druid/spark/mixins/TryWithResources.scala      | 329 +++++++++++++++
 .../scala/org/apache/druid/spark/package.scala     |  27 ++
 .../spark/configuration/ConfigurationSuite.scala   |  76 ++++
 16 files changed, 1685 insertions(+), 3 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 4211f18..4f8e68a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -74,7 +74,7 @@ jobs:
       script: ${MVN} animal-sniffer:check --fail-at-end
 
     - name: "checkstyle"
-      script: ${MVN} checkstyle:checkstyle --fail-at-end
+      script: ${MVN} checkstyle:checkstyle --fail-at-end -pl '!spark' && 
${MVN} scalastyle:check --fail-at-end -pl 'spark'
 
     - name: "enforcer checks"
       script: ${MVN} enforcer:enforce --fail-at-end
diff --git a/LICENSE b/LICENSE
index aa65ab9..7318c8d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -279,6 +279,13 @@ SOURCE/JAVA-CORE
     This product contains lpad and rpad methods adapted from Apache Flink.
       * core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
 
+    This product contains Scala logging and serializable Hadoop configuration 
utilities adapted from Apache Spark.
+      * spark/src/main/scala/org/apache/druid/spark/mixins/Logging
+
+    This product contains a Druid client wrapper adapted from Imply Data's 
druid-hadoop-input-format.
+      * spark/src/main/scala/org/apache/druid/spark/clients/DruidClient
+      * spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder
+
 
 MIT License
 ================================
diff --git a/NOTICE b/NOTICE
index 7761a54..64e3a2b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -102,3 +102,35 @@ ConciseSet to use IntBuffers.
 
 
 
+
+================= Apache Spark 2.4.7 =================
+
+Apache Spark
+Copyright 2014 and onwards The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Export Control Notice
+---------------------
+
+This distribution includes cryptographic software. The country in which you 
currently reside may have
+restrictions on the import, possession, use, and/or re-export to another 
country, of encryption software.
+BEFORE using any encryption software, please check your country's laws, 
regulations and policies concerning
+the import, possession, or use, and re-export of encryption software, to see 
if this is permitted. See
+<http://www.wassenaar.org/> for more information.
+
+The U.S. Government Department of Commerce, Bureau of Industry and Security 
(BIS), has classified this
+software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes 
information security software
+using or performing cryptographic functions with asymmetric algorithms. The 
form and manner of this Apache
+Software Foundation distribution makes it eligible for export under the 
License Exception ENC Technology
+Software Unrestricted (TSU) exception (see the BIS Export Administration 
Regulations, Section 740.13) for
+both object code and source code.
+
+The following provides more details on the included cryptographic software:
+
+This software uses Apache Commons Crypto 
(https://commons.apache.org/proper/commons-crypto/) to
+support authentication, and encryption and decryption of data sent across the 
network between
+services.
+
diff --git a/codestyle/scalastyle_config.xml b/codestyle/scalastyle_config.xml
new file mode 100644
index 0000000..9e2ed4d
--- /dev/null
+++ b/codestyle/scalastyle_config.xml
@@ -0,0 +1,139 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<scalastyle commentFilter="enabled">
+ <name>Apache Druid Scalastyle configuration</name>
+ <check level="warning" class="org.scalastyle.file.FileTabChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLengthChecker" 
enabled="true">
+  <parameters>
+   <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" 
enabled="true">
+  <parameters>
+   <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" 
enabled="true"/>
+ <check level="warning" 
class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
+ <check level="warning" 
class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" 
enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" 
enabled="true">
+  <parameters>
+   <parameter name="maxLineLength"><![CDATA[120]]></parameter>
+   <parameter name="tabSize"><![CDATA[4]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" 
enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" 
enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
+ <check level="warning" 
class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+  <parameters>
+   <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+  <parameters>
+   <parameter name="maxParameters"><![CDATA[8]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NullChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.CovariantEqualsChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
+  <parameters>
+   <parameter name="maxTypes"><![CDATA[30]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" 
enabled="true">
+  <parameters>
+   <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+   <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" 
enabled="true">
+  <parameters>
+   <parameter name="maxLength"><![CDATA[50]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" 
enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
+  <parameters>
+   <parameter name="maxMethods"><![CDATA[30]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" 
class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" 
enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.RedundantIfChecker" 
enabled="true"></check>
+ <check customId="println" level="error" 
class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[println]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" 
class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.EmptyClassChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" 
class="org.scalastyle.scalariform.UnderscoreImportChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.LowercasePatternMatchChecker" 
enabled="true"></check>
+ <check level="warning" 
class="org.scalastyle.scalariform.ImportGroupingChecker" enabled="true"></check>
+</scalastyle>
\ No newline at end of file
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index cb6cb9c..f70c128 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -44,6 +44,10 @@
             <Class name="org.apache.druid.server.AsyncQueryForwardingServlet" 
/>
         </And>
     </Match>
+    <!-- Spot Bugs doesn't work for Scala -->
+    <Match>
+        <Package name="~org\.apache\.druid\.spark.*"/>
+    </Match>
 
     <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
     <Bug pattern="BC_UNCONFIRMED_CAST"/>
diff --git a/hooks/pre-push.sh b/hooks/pre-push.sh
index a0928db..31b40ba 100755
--- a/hooks/pre-push.sh
+++ b/hooks/pre-push.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-mvn checkstyle:checkstyle --fail-at-end
+mvn checkstyle:checkstyle --fail-at-end  -pl '!spark' && mvn scalastyle:check 
--fail-at-end -pl spark
diff --git a/pom.xml b/pom.xml
index eb066a4..9786eac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
         <protobuf.version>3.11.0</protobuf.version>
         <resilience4j.version>1.3.1</resilience4j.version>
         <slf4j.version>1.7.12</slf4j.version>
+        <surefire.version>2.22.2</surefire.version>
         <!-- If compiling with different hadoop version also modify default 
hadoop coordinates in TaskConfig.java -->
         <hadoop.compile.version>2.8.5</hadoop.compile.version>
         <mockito.version>3.8.0</mockito.version>
@@ -142,6 +143,8 @@
         <!-- Core cloud functionality -->
         <module>cloud/aws-common</module>
         <module>cloud/gcp-common</module>
+        <!-- Spark Connectors -->
+        <module>spark</module>
         <!-- Core extensions -->
         <module>extensions-core/kubernetes-extensions</module>
         <module>extensions-core/avro-extensions</module>
@@ -1517,7 +1520,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.22.2</version>
+                    <version>${surefire.version}</version>
                     <configuration>
                         <!-- locale settings must be set on the command line 
before startup -->
                         <!-- set default options -->
@@ -1777,6 +1780,8 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-surefire-plugin</artifactId>
+                        <!-- Explicitly setting version here so that intelliJ 
uses the right schema for inspections -->
+                        <version>${surefire.version}</version>
                         <executions>
                             <execution>
                                 <phase>test</phase>
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..6dcae79
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,444 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>druid-spark</artifactId>
+    <name>druid-spark</name>
+    <description>Spark connectors for reading data from and writing data to 
Druid clusters</description>
+
+    <parent>
+        <groupId>org.apache.druid</groupId>
+        <artifactId>druid</artifactId>
+        <version>0.22.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <properties>
+        <scala.version>${scala.major.version}.12</scala.version>
+        <scala.major.version>2.12</scala.major.version>
+        <spark.version>2.4.8</spark.version>
+        <!-- These two properties allow -Dcheckstyle.skip to suppress 
scalastyle checks as well -->
+        <checkstyle.skip>false</checkstyle.skip>
+        <scalastyle.skip>${checkstyle.skip}</scalastyle.skip>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>4.0.2</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.jayway.jsonpath</groupId>
+                    <artifactId>json-path</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.opencsv</groupId>
+                    <artifactId>opencsv</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.airlift</groupId>
+                    <artifactId>airline</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.timeandspace</groupId>
+                    <artifactId>cron-scheduler</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.thisptr</groupId>
+                    <artifactId>jackson-jq</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.druid</groupId>
+                    <artifactId>druid-console</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.gridkit.lab</groupId>
+                    <artifactId>jvm-attach-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.hyperic</groupId>
+                    <artifactId>sigar</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.logging</groupId>
+                    <artifactId>jboss-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mozilla</groupId>
+                    <artifactId>rhino</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.skife.config</groupId>
+                    <artifactId>config-magic</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.ibm.icu</groupId>
+                    <artifactId>icu4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.maven</groupId>
+                    <artifactId>maven-artifact</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mozilla</groupId>
+                    <artifactId>rhino</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.skife.config</groupId>
+                    <artifactId>config-magic</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.http-client</groupId>
+            <artifactId>google-http-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Spark -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.major.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.major.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-unsafe_${scala.major.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.major.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scalap</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Tests -->
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.hyperic</groupId>
+                    <artifactId>sigar</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.major.version}</artifactId>
+            <version>3.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalactic</groupId>
+            <artifactId>scalactic_${scala.major.version}</artifactId>
+            <version>3.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- scala build -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.2</version>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>attach-scaladocs</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>doc-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+            <!-- disable surefire -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+            <!-- enable scalatest -->
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>1.0</version>
+                <configuration>
+                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <parallel>false</parallel>
+                    
<argLine>-Dderby.stream.error.field=org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM\
+                        
-Dlog4j.configuration=${project.basedir}/test/resources/log4j2.xml
+                    </argLine>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+                <version>1.0.0</version>
+                <configuration>
+                    <verbose>false</verbose>
+                    <failOnViolation>true</failOnViolation>
+                    
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                    <failOnWarning>false</failOnWarning>
+                    
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
+                    
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
+                    
<configLocation>${project.basedir}/../codestyle/scalastyle_config.xml</configLocation>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala 
b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
new file mode 100644
index 0000000..577f87e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.net.HostAndPort
+import org.apache.druid.java.util.common.{IAE, ISE, Intervals, JodaUtils, 
StringUtils}
+import 
org.apache.druid.java.util.http.client.response.{StringFullResponseHandler,
+  StringFullResponseHolder}
+import org.apache.druid.java.util.http.client.{HttpClient, Request}
+import org.apache.druid.query.Druids
+import org.apache.druid.query.metadata.metadata.{ColumnAnalysis, 
SegmentAnalysis,
+  SegmentMetadataQuery}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, 
DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.jboss.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
+import org.joda.time.{Duration, Interval}
+
+import java.net.URL
+import java.util.{List => JList}
+import javax.ws.rs.core.MediaType
+import scala.annotation.tailrec
+import scala.collection.JavaConverters.{asScalaBufferConverter, 
mapAsJavaMapConverter,
+  mapAsScalaMapConverter, seqAsJavaListConverter}
+
+/**
+  * Going with SegmentMetadataQueries despite the significant overhead because 
there's no other way
+  * to get accurate information about Druid columns.
+  *
+  * This is pulled pretty directly from the druid-hadoop-input-format project 
on GitHub. There is
+  * likely substantial room for improvement.
+  */
+class DruidClient(
+                   httpClient: HttpClient,
+                   hostAndPort: HostAndPort,
+                   numRetries: Int,
+                   retryWaitSeconds: Int,
+                   timeoutWaitMilliseconds: Int
+                 ) extends Logging {
+  private val druidBaseQueryURL: HostAndPort => String =
+    (hostAndPort: HostAndPort) => s"http://$hostAndPort/druid/v2/";
+
+  private val DefaultSegmentMetadataInterval = List[Interval](Intervals.utc(
+    JodaUtils.MIN_INSTANT,
+    JodaUtils.MAX_INSTANT
+  ))
+
+  /**
+    * The SQL system catalog tables are incorrect for multivalue columns and 
don't have accurate
+    * type info, so we need to fall back to segmentMetadataQueries. Given a 
DATASOURCE and a range
+    * of INTERVALS to query over, return a map from column name to a tuple of
+    * (columnType, hasMultipleValues). Note that this is a very expensive 
operation over large
+    * numbers of segments. If possible, this method should only be called with 
the most granular
+    * starting and ending intervals instead of over a larger interval.
+    *
+    * @param dataSource The Druid dataSource to fetch the schema for.
+    * @param intervals  The intervals to return the schema for, or None to 
query all segments.
+    * @return A map from column name to data type and whether or not the 
column is multi-value
+    *         for the schema of DATASOURCE.
+    */
+  def getSchema(dataSource: String, intervals: Option[List[Interval]]): 
Map[String, (String, Boolean)] = {
+    val queryInterval = intervals.getOrElse(DefaultSegmentMetadataInterval)
+    val body = Druids.newSegmentMetadataQueryBuilder()
+      .dataSource(dataSource)
+      .intervals(queryInterval.asJava)
+      .analysisTypes(SegmentMetadataQuery.AnalysisType.SIZE)
+      .merge(true)
+      .context(Map[String, AnyRef](
+        "timeout" -> Int.box(timeoutWaitMilliseconds)
+      ).asJava)
+      .build()
+    val response = sendRequestWithRetry(
+      druidBaseQueryURL(hostAndPort), numRetries, 
Option(MAPPER.writeValueAsBytes(body))
+    )
+    val segments =
+      MAPPER.readValue[JList[SegmentAnalysis]](
+        response.getContent, new TypeReference[JList[SegmentAnalysis]]() {})
+    if (segments.size() == 0) {
+      throw new IAE(
+        s"No segments found for intervals [${intervals.mkString(",")}] on 
$dataSource"
+      )
+    }
+    // Since we're setting merge to true, there should only be one item in the 
list
+    if (segments.size() > 1) {
+      throw new ISE("Merged segment metadata response had more than one row!")
+    }
+    log.debug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", 
", ", "]"))
+    /*
+     * If a dimension has multiple types within the spanned interval, the 
resulting column
+     * analysis will have the type "STRING" and be an error message. We abuse 
that here to infer
+     * a string type for the dimension and widen the type for the resulting 
DataFrame.
+     */
+    val columns = segments.asScala.head.getColumns.asScala.toMap
+    columns.foreach{ case(key, column) =>
+      if (column.isError) {
+        log.warn(s"Multiple column types found for dimension $key in interval" 
+
+          s" ${queryInterval.mkString("[", ", ", "]")}! Falling back to STRING 
type")
+      }
+    }
+    columns.map{ case (name: String, col: ColumnAnalysis) =>
+      name -> (col.getType, col.isHasMultipleValues)
+    }
+  }
+
+  /*
+   * Marking this method as tail recursive because it is for now. If a finally 
block,
+   * special error handling, or more involved set up and tear down code is 
added, this
+   * method may no longer be tail recursive and so compilation will fail. 
Because the
+   * number of retries is user-configurable and will likely be relatively 
small,
+   * latency in communication with a Druid cluster for Segment Metadata will 
be dominated
+   * by the query time, and Scala will optimize tail recursive calls 
regardless of annotation,
+   * future developers shouldn't be concerned if they need to remove this 
annotation.
+   */
+  @tailrec
+  private def sendRequestWithRetry(
+                                    url: String,
+                                    retryCount: Int,
+                                    content: Option[Array[Byte]] = None
+                                  ): StringFullResponseHolder = {
+    try {
+      sendRequest(url, content)
+    } catch {
+      case e: Exception =>
+        if (retryCount > 0) {
+          logInfo(s"Got exception: ${e.getMessage}, retrying ...")
+          Thread.sleep(retryWaitSeconds * 1000)
+          sendRequestWithRetry(url, retryCount - 1, content)
+        } else {
+          throw e
+        }
+    }
+  }
+
+  private def sendRequest(url: String, content: Option[Array[Byte]]): 
StringFullResponseHolder = {
+    try {
+      val request = buildRequest(url, content)
+      var response = httpClient.go(
+        request,
+        new StringFullResponseHandler(StringUtils.UTF8_CHARSET),
+        Duration.millis(timeoutWaitMilliseconds)
+      ).get
+      if (response.getStatus == HttpResponseStatus.TEMPORARY_REDIRECT) {
+        val newUrl = response.getResponse.headers().get("Location")
+        logInfo(s"Got a redirect, new location: $newUrl")
+        response = httpClient.go(
+          buildRequest(newUrl, content), new 
StringFullResponseHandler(StringUtils.UTF8_CHARSET)
+        ).get
+      }
+      if (!(response.getStatus == HttpResponseStatus.OK)) {
+        throw new ISE(
+          s"Error getting response for %s, status[%s] content[%s]",
+          url,
+          response.getStatus,
+          response.getContent
+        )
+      }
+      response
+    } catch {
+      case e: Exception =>
+        throw e
+    }
+  }
+
+  def buildRequest(url: String, content: Option[Array[Byte]]): Request = {
+    content.map(
+      new Request(HttpMethod.POST, new URL(url))
+        .setHeader("Content-Type", MediaType.APPLICATION_JSON)
+        .setContent(_)
+    ).getOrElse(
+      new Request(HttpMethod.GET, new URL(url))
+    )
+  }
+}
+
+object DruidClient {
+  // TODO: Add support for Kerberized etc. clients
+  def apply(conf: Configuration): DruidClient = {
+    val brokerConf = conf.dive(DruidConfigurationKeys.brokerPrefix)
+    new DruidClient(
+      HttpClientHolder.create.get,
+      HostAndPort.fromParts(
+        brokerConf.get(DruidConfigurationKeys.brokerHostDefaultKey),
+        brokerConf.getInt(DruidConfigurationKeys.brokerPortDefaultKey)),
+      brokerConf.getInt(DruidConfigurationKeys.numRetriesDefaultKey),
+      brokerConf.getInt(DruidConfigurationKeys.retryWaitSecondsDefaultKey),
+      brokerConf.getInt(DruidConfigurationKeys.timeoutMillisecondsDefaultKey)
+    )
+  }
+}
+
diff --git 
a/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala 
b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
new file mode 100644
index 0000000..b13d2c8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.google.common.base.Throwables
+import org.apache.druid.java.util.common.lifecycle.Lifecycle
+import org.apache.druid.java.util.http.client.{HttpClient, HttpClientConfig, 
HttpClientInit}
+
+import java.io.{Closeable, IOException}
+
+object HttpClientHolder {
+  def create: HttpClientHolder = {
+    val lifecycle = new Lifecycle
+    val httpClient = 
HttpClientInit.createClient(HttpClientConfig.builder.build, lifecycle)
+    try {
+      lifecycle.start()
+    } catch {
+      case e: Exception =>
+        throw Throwables.propagate(e)
+    }
+    new HttpClientHolder(lifecycle, httpClient)
+  }
+}
+
+class HttpClientHolder(val lifecycle: Lifecycle, val client: HttpClient) 
extends Closeable {
+  def get: HttpClient = {
+    client
+  }
+
+  @throws[IOException]
+  override def close(): Unit = {
+    lifecycle.stop()
+  }
+}
diff --git 
a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala 
b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
new file mode 100644
index 0000000..71f31de
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+/**
+  * A simple wrapper around a properties map that can also "dive" into a 
sub-Configuration if keys are stored using
+  * dotted separators. Because this class is designed to make working with 
Spark DataSourceOptions easier and DSOs are
+  * case-insensitive, this class is also case-insensitive.
+  *
+  * @param properties A Map of String property names to String property values 
to back this Configuration.
+  */
+class Configuration(properties: Map[String, String]) extends Serializable {
+  def getAs[T](key: String): T = {
+    properties(StringUtils.toLowerCase(key)).asInstanceOf[T]
+  }
+
+  def get(key: String): Option[String] = {
+    properties.get(StringUtils.toLowerCase(key))
+  }
+
+  def get(keyWithDefault: (String, String)): String = {
+    
this.get(StringUtils.toLowerCase(keyWithDefault._1)).getOrElse(keyWithDefault._2)
+  }
+
+  def getString(key: String): String = {
+    properties.getOrElse(StringUtils.toLowerCase(key), "")
+  }
+
+  def getInt(key: String, default: Int): Int = {
+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toInt)
+  }
+
+  def getInt(keyWithDefault: (String, Int)): Int = {
+    this.getInt(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
+  }
+
+  def getLong(key: String, default: Long): Long = {
+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toLong)
+  }
+
+  def getLong(keyWithDefault: (String, Long)): Long = {
+    this.getLong(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
+  }
+
+  def getBoolean(key: String, default: Boolean): Boolean = {
+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toBoolean)
+  }
+
+  def getBoolean(keyWithDefault: (String, Boolean)): Boolean = {
+    this.getBoolean(StringUtils.toLowerCase(keyWithDefault._1), 
keyWithDefault._2)
+  }
+
+  def apply(key: String): String = {
+    this.get(key) match {
+      case Some(v) => v
+      case None => throw new NoSuchElementException(s"Key $key not found!")
+    }
+  }
+
+  def isPresent(key: String): Boolean = {
+    properties.isDefinedAt(StringUtils.toLowerCase(key))
+  }
+
+  def isPresent(paths: String*): Boolean = {
+    
properties.isDefinedAt(StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR)))
+  }
+
+  /**
+    * Given a prefix PREFIX, return a Configuration object containing every 
key in this
+    * configuration that starts with PREFIX. Keys in the resulting 
Configuration will have PREFIX
+    * removed from their start.
+    *
+    * For example, if a Configuration contains the key `druid.broker.host` and 
`dive("druid")` is
+    * called on it, the resulting Configuration will contain the key 
`broker.host` with the same
+    * value as `druid.broker.host` had in the original Configuration.
+    *
+    * @param prefix The namespace to "dive" into.
+    * @return A Configuration containing the keys in this Configuration that 
start with PREFIX,
+    *         stripped of their leading PREFIX.
+    */
+  def dive(prefix: String): Configuration = {
+    new Configuration(properties
+      .filterKeys(_.startsWith(s"$prefix${Configuration.SEPARATOR}"))
+      .map{case (k, v) => k.substring(prefix.length + 1) -> v})
+  }
+
+  /**
+    * Given a number of prefixes, return a Configuration object containing 
every key in this
+    * configuration that starts with 
`prefixes.mkString(Configuration.SEPARATOR)`. Keys in the
+    * resulting Configuration will have the concatenation of PREFIXES removed 
from their start.
+    *
+    * Note that this is the equivalent of chaining `dive` calls, not chaining 
`merge` calls.
+    *
+    * @param prefixes The namespaces, in order, to "dive" into.
+    * @return A Configuration containing the keys that start with every prefix 
in PREFIXES joined
+    *         by periods, stripped of the leading prefixes matching the 
prefixes in PREFIXES.
+    */
+  def dive(prefixes: String*): Configuration = {
+    prefixes.foldLeft(this){case (conf, prefix) => conf.dive(prefix)}
+  }
+
+  /**
+    * Combine this configuration with another Configuration. If keys collide 
between these
+    * configurations, the corresponding values in OTHER will be selected.
+    *
+    * @param other A Configuration to merge with this Configuration.
+    * @return A Configuration containing the union of keys between this 
Configuration and OTHER.
+    *         If keys collide between the two Configurations, the values in 
OTHER will be kept.
+    */
+  def merge(other: Configuration): Configuration = {
+    new Configuration(this.properties ++ other.toMap)
+  }
+
+  /**
+    * Combine this configuration with another Configuration, moving the other 
Configuration to the provided name space.
+    * If keys collide between this configuration and the newly-namespaced 
OTHER, the corresponding values in OTHER will
+    * be selected.
+    *
+    * @param namespace The name space to merge OTHER under.
+    * @param other The Configuration to merge with this Configuration.
+    * @return A new Configuration object containing all the keys in this 
Configuration, plus the keys in OTHER
+    *         namespaced under NAMESPACE.
+    */
+  def merge(namespace: String, other: Configuration): Configuration = {
+    this.merge(Configuration(namespace, other.toMap))
+  }
+
+  /**
+    * Add the properties specified in PROPERTIES to this Configuration's 
properties, moving the new properties to the
+    * provided name space. If this Configuration already contains keys under 
the provided name space and those keys
+    * collide with the properties specified in PROPERTIES, the corresponding 
values in PROPERTIES will be selected.
+    *
+    * @param namespace The name space to merge the properties specified in 
PROPERTIES under.
+    * @param properties The map of properties to values to combine with the 
properties from this Configuration.
+    * @return A new Configuration object containing all the keys in this 
Configuration, plus the properties in
+    *         PROPERTIES namespaced under NAMESPACE.
+    */
+  def merge(namespace: String, properties: Map[String, String]): Configuration 
= {
+    this.merge(Configuration(namespace, properties))
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj.isInstanceOf[Configuration] && this.toMap == 
obj.asInstanceOf[Configuration].toMap
+  }
+
+  override def hashCode(): Int = {
+    this.properties.hashCode()
+  }
+
+  def toMap: Map[String, String] = {
+    this.properties
+  }
+
+  override def toString: String = {
+    this.toMap.mkString("Configuration{", "; ", "}")
+  }
+}
+
+object Configuration {
+  def apply(properties: Map[String, String]): Configuration = {
+    new Configuration(properties.map{case (k, v) => StringUtils.toLowerCase(k) 
-> v})
+  }
+
+  def apply(namespace: String, properties: Map[String, String]): Configuration 
= {
+    new Configuration(properties.map{case(k, v) => 
StringUtils.toLowerCase(s"$namespace$SEPARATOR$k") -> v})
+  }
+
+  def apply(dso: DataSourceOptions): Configuration = {
+    new Configuration(dso.asMap().asScala.toMap)
+  }
+
+  def fromKeyValue(key: String, value: String): Configuration = {
+    new Configuration(Map[String, String](StringUtils.toLowerCase(key) -> 
value))
+  }
+
+  /**
+    * Get the key corresponding to each element of PATHS interpreted as a 
namespace or property.
+    *
+    * @param paths The parent namespaces and property as individual strings to 
convert into a single configuration key.
+    * @return The path to a property through its parent namespaces as a single 
configuration key.
+    */
+  def toKey(paths: String*): String = {
+    StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))
+  }
+
+  private val SEPARATOR = "."
+}
diff --git 
a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
 
b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
new file mode 100644
index 0000000..cfa1d74
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+object DruidConfigurationKeys {
+  // Druid Client Configs
+  val brokerPrefix: String = "broker"
+  val brokerHostKey: String = "host"
+  val brokerPortKey: String = "port"
+  val numRetriesKey: String = "numRetries"
+  val retryWaitSecondsKey: String = "retryWaitSeconds"
+  val timeoutMillisecondsKey: String = "timeoutMilliseconds"
+  private[spark] val brokerHostDefaultKey: (String, String) = (brokerHostKey, 
"localhost")
+  private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 
8082)
+  private[spark] val numRetriesDefaultKey: (String, Int) = (numRetriesKey, 5)
+  private[spark] val retryWaitSecondsDefaultKey: (String, Int) = 
(retryWaitSecondsKey, 5)
+  private[spark] val timeoutMillisecondsDefaultKey: (String, Int) = 
(timeoutMillisecondsKey, 300000)
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala 
b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
new file mode 100644
index 0000000..055c22b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.mixins
+
+import org.apache.druid.java.util.common.logger.Logger
+
+/**
+  * Simplified version of org.apache.spark.internal.Logging. Uses
+  * org.apache.druid.java.util.common.logger.Logger instead of slf4j's Logger 
directly.
+  */
+trait Logging {
+
+  @transient private var logger: Logger = _
+
+  private lazy val logName = this.getClass.getName.stripSuffix("$")
+
+  /**
+    * Return the configured underlying logger
+    *
+    * @return the configured underlying logger
+    */
+  protected def log: Logger = {
+    if (logger == null) {
+      logger= new Logger(logName)
+    }
+    logger
+  }
+
+  /**
+    * Log a message at the TRACE level.
+    *
+    * @param msg the message string to be logged
+    */
+  protected def logTrace(msg: => String): Unit = {
+    if (log.isTraceEnabled) {
+      log.trace(msg)
+    }
+  }
+
+  /**
+    * Log a message at the DEBUG level.
+    *
+    * @param msg the message string to be logged
+    */
+  protected def logDebug(msg: => String): Unit = {
+    if (log.isDebugEnabled) {
+      log.debug(msg)
+    }
+  }
+
+  /**
+    * Log a message at the INFO level.
+    *
+    * @param msg the message string to be logged
+    */
+  protected def logInfo(msg: => String): Unit = {
+    if (log.isInfoEnabled) {
+      log.info(msg)
+    }
+  }
+
+  /**
+    * Log a message at the WARN level.
+    *
+    * @param msg the message string to be logged
+    */
+  protected def logWarn(msg: => String): Unit = {
+    log.warn(msg)
+  }
+
+  /**
+    * Log a message with an exception at the WARN level.
+    *
+    * @param msg       the message string to be logged
+    * @param exception the exception to log in addition to the message
+    */
+  protected def logWarn(msg: => String, exception: Throwable): Unit = {
+    log.warn(exception, msg)
+  }
+
+  /**
+    * Log a message at the ERROR level.
+    *
+    * @param msg the message string to be logged
+    */
+  protected def logError(msg: => String): Unit = {
+    log.error(msg)
+  }
+
+  /**
+    * Log a message with an exception at the ERROR level.
+    *
+    * @param msg       the message string to be logged
+    * @param exception the exception to log in addition to the message
+    */
+  protected def logError(msg: => String, exception: Throwable): Unit = {
+      log.error(exception, msg)
+  }
+}
diff --git 
a/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala 
b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
new file mode 100644
index 0000000..6b0ff24
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.mixins
+
+import scala.util.control.{ControlThrowable, NonFatal}
+
+/**
+  * Utility trait to ape the try-with-resources construct in Java. This is 
quick and dirty and has its flaws. If a more
+  * robust approach is needed, the better-files approach 
(https://github.com/pathikrit/better-files#lightweight-arm)
+  * should be considered.
+  *
+  * Scala doesn't support varargs of generic type, so using overloading to 
support managing multiple resources with a
+  * sequence. Ideally Scala would support macro programming to generate 
tryWithResources functions for
+  * tuples of [_ <: AutoCloseable] so users could un-tuple multiple resources 
as named variables without needing to
+  * manually generate separate methods for each length up to 22, but since it 
doesn't I've created methods for up to
+  * five resources at once. These methods can be nested or the method taking a 
Sequence of AutoCloseables can be used
+  * for arbitrary resources, with the caveat that named unpacking is not 
possible over Sequences. If there was a way to
+  * enforce type bounds on the elements of subclasses of Produce we could hack 
up a type that unioned AutoCloseable and
+  * Products whose elements are all subtypes of AutoCloseable, but since there 
isn't this is a quick fix. Note that
+  * shared code for the tuple methods is not refactored to a single private 
method because the common supertype of
+  * tuples is Product, with no type information.
+  */
+trait TryWithResources {
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Mix in this trait and then call like so:
+    * val resource = new ResourceImplementingAutoCloseable()
+    * tryWithResources(resource){r =>
+    *   r.doSomething()
+    * }
+    *
+    * or, if desired,
+    *
+    * tryWithResources(new ResourceImplementingAutoCloseable()){ resouce =>
+    *   resource.doSomething()
+    * }
+    *
+    * @param resource The AutoCloseable resource to use in FUNC.
+    * @param func     The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCE.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resource: T)(func: T => V): V = {
+    try {
+      func(resource)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        try {
+          resource.close()
+        } catch {
+          case NonFatal(e2) =>
+            // e2 isn't fatal, let the original exception e take precedence
+            e.addSuppressed(e2)
+          case e2: Throwable =>
+            if (NonFatal(e)) {
+              // e2 is fatal but e isn't, suppress e
+              e2.addSuppressed(e)
+              throw e2
+            }
+            // Both exceptions are fatal, suppress the also-fatal e2 that 
occurred while closing
+            e.addSuppressed(e2)
+        }
+        throw e
+    } finally {
+      resource.close()
+    }
+  }
+
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Unfortunately Scala doesn't support varargs
+    * for generic types and I don't feel like writing 22 separate functions, 
so callers will have to keep track of the
+    * exact ordering of elements in the resource sequence or nest the provided 
tuple methods.
+    *
+    * To use, mix in this trait and then call like so:
+    * val fileResource = new ResourceImplementingAutoCloseable()
+    * val writerResource = new OtherAutoCloseable()
+    * tryWithResources(Seq(fileResource, writerResource)){resources =>
+    *   val file = resources(0)
+    *   val writer = resources(1)
+    *   writer.write(file, data)
+    * }
+    *
+    * @param resources A list of AutoCloseable resources to use in FUNC.
+    * @param func      The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCES.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resources: Seq[T])(func: Seq[T] 
=> V): V = {
+    try {
+      func(resources)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        throw closeAllResourcesMergingExceptions(resources, e)
+    } finally {
+      closeAllResourcesFinally(resources)
+    }
+  }
+
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Mix in this trait and then call like so:
+    * val resources = (new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable())
+    * tryWithResources(resources){
+    *   case (first, second) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    * }
+    *
+    * or, if desired,
+    *
+    * tryWithResources(new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable){
+    *   case (first, second) =>
+    *     first.doSomething()
+    *     second.doSomething
+    * }
+    *
+    * @param resources A tuple of AutoCloseable resources to use in FUNC.
+    * @param func      The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCES.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T))(func: ((T, 
T)) => V): V = {
+    val closeableResources = 
resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+    try {
+      func(resources)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        throw closeAllResourcesMergingExceptions(closeableResources, e)
+    } finally {
+      closeAllResourcesFinally(closeableResources)
+    }
+  }
+
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Mix in this trait and then call like so:
+    * val resources = (new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable(), ...)
+    * tryWithResources(resources){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * or, if desired,
+    *
+    * tryWithResources(new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable, ...){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * @param resources A tuple of AutoCloseable resources to use in FUNC.
+    * @param func      The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCES.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T))(func: ((T, 
T, T)) => V): V = {
+    val closeableResources = 
resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+    try {
+      func(resources)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        throw closeAllResourcesMergingExceptions(closeableResources, e)
+    } finally {
+      closeAllResourcesFinally(closeableResources)
+    }
+  }
+
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Mix in this trait and then call like so:
+    * val resources = (new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable(), ...)
+    * tryWithResources(resources){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * or, if desired,
+    *
+    * tryWithResources(new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable, ...){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * @param resources A tuple of AutoCloseable resources to use in FUNC.
+    * @param func      The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCES.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T))(func: 
((T, T, T, T)) => V): V = {
+    val closeableResources = 
resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+    try {
+      func(resources)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        throw closeAllResourcesMergingExceptions(closeableResources, e)
+    } finally {
+      closeAllResourcesFinally(closeableResources)
+    }
+  }
+
+  /**
+    * A helper function to duplicate Java's try-with-resources construction. 
Mix in this trait and then call like so:
+    * val resources = (new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable(), ...)
+    * tryWithResources(resources){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * or, if desired,
+    *
+    * tryWithResources(new ResourceImplementingAutoCloseable(), new 
ResourceImplementingAutoCloseable, ...){
+    *   case (first, second, ...) =>
+    *     first.doSomething()
+    *     second.doSomething()
+    *     ...
+    * }
+    *
+    * @param resources A tuple of AutoCloseable resources to use in FUNC.
+    * @param func      The function block to execute (think of this as the try 
block).
+    * @tparam T Any subtype of AutoCloseable.
+    * @tparam V The result type of FUNC.
+    * @return The result of executing FUNC with RESOURCES.
+    */
+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T, 
T))(func: ((T, T, T, T, T)) => V): V = {
+    val closeableResources = 
resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+    try {
+      func(resources)
+    } catch {
+      // Clean up for InterruptedExceptions and ControlThrowables, not just 
NonFatal exceptions
+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+        throw closeAllResourcesMergingExceptions(closeableResources, e)
+    } finally {
+      closeAllResourcesFinally(closeableResources)
+    }
+  }
+
+  /**
+    * Given a list of closeables RESOURCES and an throwable EXCEPTION, close 
all supplied resources, merging any
+    * additional errors that occur. The main point here is to ensure every 
resource in resources is closed; I'm not sure
+    * how useful the "merge" logic actually is when pulled out to a list of 
closeables instead of just a single one.
+    *
+    * @param resources The list of resources to close.
+    * @param exception The exception to merge additional throwables into.
+    * @return The final throwable resulting from "merging" EXCEPTION with any 
additional throwables raised while closing
+    *         the resources in RESOURCES.
+    */
+  private def closeAllResourcesMergingExceptions(resources: 
Seq[AutoCloseable], exception: Throwable): Throwable = {
+    resources.foldRight(exception)((resource, ex) =>
+      try {
+        resource.close()
+        ex
+      } catch {
+        case NonFatal(e2) =>
+          // e2 isn't fatal, let the original exception e take precedence
+          ex.addSuppressed(e2)
+          ex
+        case e2: Throwable =>
+          if (NonFatal(ex)) {
+            // e2 is fatal but e isn't, suppress e
+            e2.addSuppressed(ex)
+            e2
+          } else {
+            // Both exceptions are fatal, suppress the also-fatal e2 that 
occurred while closing
+            ex.addSuppressed(e2)
+            ex
+          }
+      }
+    )
+  }
+
+  /**
+    * Given RESOURCES, attempts to close all of them even in the face of 
errors. Arbitrarily, the last exception
+    * encountered is thrown, with earlier exceptions suppressed.
+    *
+    * @param resources The list of resources to close.
+    */
+  private def closeAllResourcesFinally(resources: Seq[AutoCloseable]): Unit = {
+    // Using foldRight to iterate over resources to ensure we don't short 
circuit and leave resources unclosed if an
+    // earlier resource throws an exception on .close().
+    val exceptionOption = resources
+      .foldRight(None.asInstanceOf[Option[Throwable]])((resource, exOpt) =>
+        try {
+          resource.close()
+          exOpt
+        } catch {
+          case e: Throwable =>
+            Some(exOpt.fold(e) { ex =>
+              ex.addSuppressed(e)
+              ex
+            })
+        }
+      )
+    if (exceptionOption.isDefined) {
+      throw exceptionOption.get
+    }
+  }
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/package.scala 
b/spark/src/main/scala/org/apache/druid/spark/package.scala
new file mode 100644
index 0000000..22b440a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/package.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.druid.jackson.DefaultObjectMapper
+
+package object spark {
+  private[spark] val MAPPER: ObjectMapper = new DefaultObjectMapper()
+}
diff --git 
a/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
 
b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
new file mode 100644
index 0000000..0bdb9cd
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.should.Matchers
+
+class ConfigurationSuite extends AnyFunSuite with Matchers {
+  private val testConf: Configuration = Configuration(Map[String, String](
+    Configuration.toKey("test", "sub-conf", "property") -> "testProp",
+    Configuration.toKey("test", "property") -> "quizProp",
+    "property" -> "examProperty"
+  ))
+
+  test("isPresent should correctly handle single keys as well as paths to 
properties") {
+    testConf.isPresent("property") should be(true)
+    testConf.isPresent("test", "sub-conf", "property") should be(true)
+    testConf.isPresent("test") should be(false)
+    testConf.isPresent("exam", "sub-conf", "property") should be(false)
+    testConf.isPresent("test.property") should be(true)
+    testConf.isPresent("test.property.property") should be(false)
+  }
+
+  test("dive should correctly return sub-configurations") {
+    val subConf = testConf.dive("test")
+    subConf.getString("property") should equal("quizProp")
+    subConf.dive("sub-conf") should 
equal(Configuration.fromKeyValue("property", "testProp"))
+    subConf.dive("sub-conf") should equal(testConf.dive("test", "sub-conf"))
+  }
+
+  test("dive should return empty maps when called on uncontained namespaces") {
+    testConf.dive("exam").toMap.isEmpty should be(true)
+  }
+
+  test("Configurations should be case-insensitive") {
+    testConf.getString("pRoPeRtY") should equal(testConf.getString("property"))
+  }
+
+  test("merge should correctly combine Configurations") {
+    val otherConf = Configuration(Map[String, String](
+      Configuration.toKey("other", "conf", "key") -> "value",
+      "property" -> "new property"
+    ))
+    val mergedConf = testConf.merge(otherConf)
+    mergedConf.getString(Configuration.toKey("other", "conf", "key")) should 
equal("value")
+    mergedConf.getString("property") should equal("new property")
+  }
+
+  test("merge should correctly namespace merged Configurations") {
+    val otherConf = Configuration(Map[String, String](
+      "key" -> "1",
+      "property" -> "new property"
+    ))
+    val mergedConf = testConf.merge("test", otherConf)
+    mergedConf.getString(Configuration.toKey("test", "property")) should 
equal("new property")
+    mergedConf.getInt(Configuration.toKey("test", "key"), -1) should equal(1)
+    mergedConf("property") should equal("examProperty")
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to