http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index c3e7e5d..8fcadea 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,7 +30,6 @@ import com.google.common.collect.Interners;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import com.google.common.io.CharStreams;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.StringDimensionSchema;
 import io.druid.jackson.DefaultObjectMapper;
@@ -69,6 +68,7 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
 import io.druid.segment.IndexSpec;
+import io.druid.segment.data.BitmapSerdeFactory;
 import io.druid.segment.data.ConciseBitmapSerdeFactory;
 import io.druid.segment.data.RoaringBitmapSerdeFactory;
 import io.druid.segment.indexing.granularity.GranularitySpec;
@@ -92,6 +92,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
@@ -105,6 +106,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.joda.time.chrono.ISOChronology;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
@@ -117,37 +119,52 @@ import org.skife.jdbi.v2.util.ByteArrayMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.Reader;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils class for Druid storage handler.
  */
 public final class DruidStorageHandlerUtils {
+  private DruidStorageHandlerUtils () {
+
+  }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DruidStorageHandlerUtils.class);
 
+  private static final String DRUID_ROLLUP = "druid.rollup";
+  private static final String DRUID_QUERY_GRANULARITY = 
"druid.query.granularity";
+  public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+  static final String DRUID_SEGMENT_DIRECTORY = 
"druid.storage.storageDirectory";
+  public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = 
"druid.storage.storageDirectory.intermediate";
+  public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+  public static final String DRUID_JOB_WORKING_DIRECTORY = 
"druid.job.workingDirectory";
+  static final String KAFKA_TOPIC = "kafka.topic";
+  static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+  static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = 
"druid.kafka.ingestion.";
+  static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = 
DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer.";
+  /* Kafka Ingestion state - valid values - START/STOP/RESET */
+  static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
   private static final int NUM_RETRIES = 8;
   private static final int SECONDS_BETWEEN_RETRIES = 2;
   private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
@@ -157,44 +174,44 @@ public final class DruidStorageHandlerUtils {
   public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
   //Druid Json timestamp column name
   public static final String EVENT_TIMESTAMP_COLUMN = "timestamp";
-  public static final String INDEX_ZIP = "index.zip";
-  public static final String DESCRIPTOR_JSON = "descriptor.json";
-  public static final Interval DEFAULT_INTERVAL = new Interval(
-          new DateTime("1900-01-01", ISOChronology.getInstanceUTC()),
-          new DateTime("3000-01-01", ISOChronology.getInstanceUTC())
-  ).withChronology(ISOChronology.getInstanceUTC());
+  static final String INDEX_ZIP = "index.zip";
+  private static final String DESCRIPTOR_JSON = "descriptor.json";
+  private static final Interval
+      DEFAULT_INTERVAL =
+      new Interval(new DateTime("1900-01-01", ISOChronology.getInstanceUTC()),
+          new DateTime("3000-01-01", 
ISOChronology.getInstanceUTC())).withChronology(ISOChronology.getInstanceUTC());
 
   /**
-   * Mapper to use to serialize/deserialize Druid objects (JSON)
+   * Mapper to use to serialize/deserialize Druid objects (JSON).
    */
   public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
 
   /**
-   * Mapper to use to serialize/deserialize Druid objects (SMILE)
+   * Mapper to use to serialize/deserialize Druid objects (SMILE).
    */
   public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new 
SmileFactory());
   private static final int DEFAULT_MAX_TRIES = 10;
 
   static {
     // This is needed for serde of PagingSpec as it uses JacksonInject for 
injecting SelectQueryConfig
-    InjectableValues.Std injectableValues = new InjectableValues.Std()
-        .addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
-        // Expressions macro table used when we deserialize the query from 
calcite plan
-        .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList
-            .of(new LikeExprMacro(),
-                new RegexpExtractExprMacro(),
-                new TimestampCeilExprMacro(),
-                new TimestampExtractExprMacro(),
-                new TimestampFormatExprMacro(),
-                new TimestampParseExprMacro(),
-                new TimestampShiftExprMacro(),
-                new TimestampFloorExprMacro(),
-                new TrimExprMacro.BothTrimExprMacro(),
-                new TrimExprMacro.LeftTrimExprMacro(),
-                new TrimExprMacro.RightTrimExprMacro()
-            )))
-        .addValue(ObjectMapper.class, JSON_MAPPER)
-        .addValue(DataSegment.PruneLoadSpecHolder.class, 
DataSegment.PruneLoadSpecHolder.DEFAULT);
+    InjectableValues.Std
+        injectableValues =
+        new InjectableValues.Std().addValue(SelectQueryConfig.class, new 
SelectQueryConfig(false))
+            // Expressions macro table used when we deserialize the query from 
calcite plan
+            .addValue(ExprMacroTable.class,
+                new ExprMacroTable(ImmutableList.of(new LikeExprMacro(),
+                    new RegexpExtractExprMacro(),
+                    new TimestampCeilExprMacro(),
+                    new TimestampExtractExprMacro(),
+                    new TimestampFormatExprMacro(),
+                    new TimestampParseExprMacro(),
+                    new TimestampShiftExprMacro(),
+                    new TimestampFloorExprMacro(),
+                    new TrimExprMacro.BothTrimExprMacro(),
+                    new TrimExprMacro.LeftTrimExprMacro(),
+                    new TrimExprMacro.RightTrimExprMacro())))
+            .addValue(ObjectMapper.class, JSON_MAPPER)
+            .addValue(DataSegment.PruneLoadSpecHolder.class, 
DataSegment.PruneLoadSpecHolder.DEFAULT);
 
     JSON_MAPPER.setInjectableValues(injectableValues);
     SMILE_MAPPER.setInjectableValues(injectableValues);
@@ -206,110 +223,96 @@ public final class DruidStorageHandlerUtils {
     JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
     try {
       // No operation emitter will be used by some internal druid classes.
-      EmittingLogger.registerEmitter(
-              new ServiceEmitter("druid-hive-indexer", 
InetAddress.getLocalHost().getHostName(),
-                      new NoopEmitter()
-              ));
+      EmittingLogger.registerEmitter(new ServiceEmitter("druid-hive-indexer",
+          InetAddress.getLocalHost().getHostName(),
+          new NoopEmitter()));
     } catch (UnknownHostException e) {
       throw Throwables.propagate(e);
     }
   }
 
   /**
-   * Used by druid to perform IO on indexes
+   * Used by druid to perform IO on indexes.
    */
-  public static final IndexIO INDEX_IO =
+  public static final IndexIO
+      INDEX_IO =
       new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), 
() -> 0);
 
   /**
-   * Used by druid to merge indexes
+   * Used by druid to merge indexes.
    */
-  public static final IndexMergerV9 INDEX_MERGER_V9 = new 
IndexMergerV9(JSON_MAPPER,
-          
DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance()
-  );
+  public static final IndexMergerV9
+      INDEX_MERGER_V9 =
+      new IndexMergerV9(JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, 
TmpFileSegmentWriteOutMediumFactory.instance());
 
   /**
-   * Generic Interner implementation used to read segments object from 
metadata storage
+   * Generic Interner implementation used to read segments object from 
metadata storage.
    */
-  public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = 
Interners.newWeakInterner();
+  private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = 
Interners.newWeakInterner();
 
   /**
    * Method that creates a request for Druid query using SMILE format.
    *
-   * @param address
-   * @param query
-   *
-   * @return
+   * @param address of the host target.
+   * @param query druid query.
    *
-   * @throws IOException
+   * @return Request object to be submitted.
    */
-  public static Request createSmileRequest(String address, 
io.druid.query.Query query)
-          throws IOException {
-    return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", 
"http://"; + address)))
-            .setContent(SMILE_MAPPER.writeValueAsBytes(query))
-            .setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE);
+  public static Request createSmileRequest(String address, 
io.druid.query.Query query) {
+    try {
+      return new Request(HttpMethod.POST, new 
URL(String.format("%s/druid/v2/", "http://"; + address))).setContent(
+          
SMILE_MAPPER.writeValueAsBytes(query)).setHeader(HttpHeaders.Names.CONTENT_TYPE,
 SMILE_CONTENT_TYPE);
+    } catch (MalformedURLException e) {
+      LOG.error("URL Malformed  address {}", address);
+      throw new RuntimeException(e);
+    } catch (JsonProcessingException e) {
+      LOG.error("can not Serialize the Query [{}]", query.toString());
+      throw new RuntimeException(e);
+    }
   }
 
   /**
    * Method that submits a request to an Http address and retrieves the result.
    * The caller is responsible for closing the stream once it finishes 
consuming it.
    *
-   * @param client
-   * @param request
+   * @param client Http Client will be used to submit request.
+   * @param request Http request to be submitted.
    *
-   * @return
+   * @return response object.
    *
-   * @throws IOException
+   * @throws IOException in case of request IO error.
    */
-  public static InputStream submitRequest(HttpClient client, Request request)
-          throws IOException {
-    InputStream response;
+  public static InputStream submitRequest(HttpClient client, Request request) 
throws IOException {
     try {
-      response = client.go(request, new InputStreamResponseHandler()).get();
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } catch (InterruptedException e) {
+      return client.go(request, new InputStreamResponseHandler()).get();
+    } catch (ExecutionException | InterruptedException e) {
       throw new IOException(e.getCause());
     }
-    return response;
-  }
 
-  public static String getURL(HttpClient client, URL url) throws IOException {
-    try (Reader reader = new InputStreamReader(
-            DruidStorageHandlerUtils.submitRequest(client, new 
Request(HttpMethod.GET, url)))) {
-      return CharStreams.toString(reader);
-    }
   }
 
-  public static FullResponseHolder getResponseFromCurrentLeader(HttpClient 
client, Request request,
-          FullResponseHandler fullResponseHandler)
-          throws ExecutionException, InterruptedException {
-    FullResponseHolder responseHolder = client.go(request,
-            fullResponseHandler).get();
+  static FullResponseHolder getResponseFromCurrentLeader(HttpClient client,
+      Request request,
+      FullResponseHandler fullResponseHandler) throws ExecutionException, 
InterruptedException {
+    FullResponseHolder responseHolder = client.go(request, 
fullResponseHandler).get();
     if 
(HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) {
       String redirectUrlStr = 
responseHolder.getResponse().headers().get("Location");
-      LOG.debug("Request[%s] received redirect response to location [%s].", 
request.getUrl(),
-              redirectUrlStr);
+      LOG.debug("Request[%s] received redirect response to location [%s].", 
request.getUrl(), redirectUrlStr);
       final URL redirectUrl;
       try {
         redirectUrl = new URL(redirectUrlStr);
       } catch (MalformedURLException ex) {
-        throw new ExecutionException(
-                String.format(
-                        "Malformed redirect location is found in response from 
url[%s], new location[%s].",
-                        request.getUrl(),
-                        redirectUrlStr),
-                ex
-        );
+        throw new ExecutionException(String.format(
+            "Malformed redirect location is found in response from url[%s], 
new location[%s].",
+            request.getUrl(),
+            redirectUrlStr), ex);
       }
-      responseHolder = client.go(withUrl(request, redirectUrl),
-              fullResponseHandler).get();
+      responseHolder = client.go(withUrl(request, redirectUrl), 
fullResponseHandler).get();
     }
     return responseHolder;
   }
 
-  private static Request withUrl(Request old, URL url)
-  {
+  private static Request withUrl(Request old, URL url) {
     Request req = new Request(old.getMethod(), url);
     req.addHeaderValues(old.getHeaders());
     if (old.hasContent()) {
@@ -320,68 +323,49 @@ public final class DruidStorageHandlerUtils {
 
   /**
    * @param taskDir path to the  directory containing the segments descriptor 
info
-   *                the descriptor path will be .../workingPath/task_id/{@link 
DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
+   *                the descriptor path will be
+   *                ../workingPath/task_id/{@link 
DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json
    * @param conf    hadoop conf to get the file system
    *
    * @return List of DataSegments
    *
    * @throws IOException can be for the case we did not produce data.
    */
-  public static List<DataSegment> getCreatedSegments(Path taskDir, 
Configuration conf)
-          throws IOException {
+  public static List<DataSegment> getCreatedSegments(Path taskDir, 
Configuration conf) throws IOException {
     ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = 
ImmutableList.builder();
     FileSystem fs = taskDir.getFileSystem(conf);
     FileStatus[] fss;
     fss = fs.listStatus(taskDir);
     for (FileStatus fileStatus : fss) {
-      final DataSegment segment = JSON_MAPPER
-              .readValue((InputStream) fs.open(fileStatus.getPath()), 
DataSegment.class);
+      final DataSegment segment = JSON_MAPPER.readValue((InputStream) 
fs.open(fileStatus.getPath()), DataSegment.class);
       publishedSegmentsBuilder.add(segment);
     }
     return publishedSegmentsBuilder.build();
   }
 
   /**
-   * This function will write to filesystem serialized from of segment 
descriptor
-   * if an existing file exists it will try to replace it.
+   * Writes to filesystem serialized form of segment descriptor if an existing 
file exists it will try to replace it.
    *
-   * @param outputFS       filesystem
-   * @param segment        DataSegment object
-   * @param descriptorPath path
+   * @param outputFS       filesystem.
+   * @param segment        DataSegment object.
+   * @param descriptorPath path.
    *
-   * @throws IOException
+   * @throws IOException in case any IO issues occur.
    */
-  public static void writeSegmentDescriptor(
-          final FileSystem outputFS,
-          final DataSegment segment,
-          final Path descriptorPath
-  )
-          throws IOException {
-    final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
-            DataPusher.class, () -> {
-              try {
-                if (outputFS.exists(descriptorPath)) {
-                  if (!outputFS.delete(descriptorPath, false)) {
-                    throw new IOException(
-                            String.format("Failed to delete descriptor at 
[%s]", descriptorPath));
-                  }
-                }
-                try (final OutputStream descriptorOut = outputFS.create(
-                        descriptorPath,
-                        true,
-                        DEFAULT_FS_BUFFER_SIZE
-                )) {
-                  JSON_MAPPER.writeValue(descriptorOut, segment);
-                  descriptorOut.flush();
-                }
-              } catch (RuntimeException | IOException ex) {
-                throw ex;
-              }
-              return -1;
-            },
-            RetryPolicies
-                    .exponentialBackoffRetry(NUM_RETRIES, 
SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
-    );
+  public static void writeSegmentDescriptor(final FileSystem outputFS,
+      final DataSegment segment,
+      final Path descriptorPath) throws IOException {
+    final DataPusher descriptorPusher = (DataPusher) 
RetryProxy.create(DataPusher.class, () -> {
+      if (outputFS.exists(descriptorPath)) {
+        if (!outputFS.delete(descriptorPath, false)) {
+          throw new IOException(String.format("Failed to delete descriptor at 
[%s]", descriptorPath));
+        }
+      }
+      try (final OutputStream descriptorOut = outputFS.create(descriptorPath, 
true, DEFAULT_FS_BUFFER_SIZE)) {
+        JSON_MAPPER.writeValue(descriptorOut, segment);
+        descriptorOut.flush();
+      }
+    }, RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, 
SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS));
     descriptorPusher.push();
   }
 
@@ -391,23 +375,17 @@ public final class DruidStorageHandlerUtils {
    *
    * @return all the active data sources in the metadata storage
    */
-  public static Collection<String> getAllDataSourceNames(SQLMetadataConnector 
connector,
-          final MetadataStorageTablesConfig metadataStorageTablesConfig
-  ) {
-    return connector.getDBI().withHandle(
-            (HandleCallback<List<String>>) handle -> handle.createQuery(
-                    String.format("SELECT DISTINCT(datasource) FROM %s WHERE 
used = true",
-                            metadataStorageTablesConfig.getSegmentsTable()
-                    ))
-                    .fold(Lists.<String>newArrayList(),
-                        (druidDataSources, stringObjectMap, foldController, 
statementContext) -> {
-                          druidDataSources.add(
-                                  MapUtils.getString(stringObjectMap, 
"datasource")
-                          );
-                          return druidDataSources;
-                        }
-                    )
-    );
+  static Collection<String> getAllDataSourceNames(SQLMetadataConnector 
connector,
+      final MetadataStorageTablesConfig metadataStorageTablesConfig) {
+    return connector.getDBI()
+        .withHandle((HandleCallback<List<String>>) handle -> 
handle.createQuery(String.format(
+            "SELECT DISTINCT(datasource) FROM %s WHERE used = true",
+            metadataStorageTablesConfig.getSegmentsTable()))
+            .fold(Lists.<String>newArrayList(),
+                (druidDataSources, stringObjectMap, foldController, 
statementContext) -> {
+                  druidDataSources.add(MapUtils.getString(stringObjectMap, 
"datasource"));
+                  return druidDataSources;
+                }));
   }
 
   /**
@@ -417,21 +395,19 @@ public final class DruidStorageHandlerUtils {
    *
    * @return true if the data source was successfully disabled false otherwise
    */
-  public static boolean disableDataSource(SQLMetadataConnector connector,
-          final MetadataStorageTablesConfig metadataStorageTablesConfig, final 
String dataSource
-  ) {
+  static boolean disableDataSource(SQLMetadataConnector connector,
+      final MetadataStorageTablesConfig metadataStorageTablesConfig,
+      final String dataSource) {
     try {
       if (!getAllDataSourceNames(connector, 
metadataStorageTablesConfig).contains(dataSource)) {
         LOG.warn("Cannot delete data source {}, does not exist", dataSource);
         return false;
       }
 
-      connector.getDBI().withHandle(
-              (HandleCallback<Void>) handle -> {
-                disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
-                return null;
-              }
-      );
+      connector.getDBI().withHandle((HandleCallback<Void>) handle -> {
+        disableDataSourceWithHandle(handle, metadataStorageTablesConfig, 
dataSource);
+        return null;
+      });
 
     } catch (Exception e) {
       LOG.error(String.format("Error removing dataSource %s", dataSource), e);
@@ -441,8 +417,8 @@ public final class DruidStorageHandlerUtils {
   }
 
   /**
-   * First computes the segments timeline to accommodate new segments for 
insert into case
-   * Then moves segments to druid deep storage with updated metadata/version
+   * First computes the segments timeline to accommodate new segments for 
insert into case.
+   * Then moves segments to druid deep storage with updated metadata/version.
    * ALL IS DONE IN ONE TRANSACTION
    *
    * @param connector DBI connector to commit
@@ -456,153 +432,129 @@ public final class DruidStorageHandlerUtils {
    * @return List of successfully published Druid segments.
    * This list has the updated versions and metadata about segments after move 
and timeline sorting
    *
-   * @throws CallbackFailedException
+   * @throws CallbackFailedException in case the connector can not add the 
segment to the DB.
    */
-  public static List<DataSegment> publishSegmentsAndCommit(final 
SQLMetadataConnector connector,
-          final MetadataStorageTablesConfig metadataStorageTablesConfig,
-          final String dataSource,
-          final List<DataSegment> segments,
-          boolean overwrite,
-          Configuration conf,
-          DataSegmentPusher dataSegmentPusher
-  ) throws CallbackFailedException {
-    return connector.getDBI().inTransaction(
-            (handle, transactionStatus) -> {
-              // We create the timeline for the existing and new segments
-              VersionedIntervalTimeline<String, DataSegment> timeline;
-              if (overwrite) {
-                // If we are overwriting, we disable existing sources
-                disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
-
-                // When overwriting, we just start with empty timeline,
-                // as we are overwriting segments with new versions
-                timeline = new VersionedIntervalTimeline<>(Ordering.natural());
-              } else {
-                // Append Mode
-                if (segments.isEmpty()) {
-                  // If there are no new segments, we can just bail out
-                  return Collections.EMPTY_LIST;
-                }
-                // Otherwise, build a timeline of existing segments in 
metadata storage
-                Interval indexedInterval = JodaUtils
-                        .umbrellaInterval(Iterables.transform(segments,
-                                input -> input.getInterval()
-                        ));
-                LOG.info("Building timeline for umbrella Interval [{}]", 
indexedInterval);
-                timeline = getTimelineForIntervalWithHandle(
-                        handle, dataSource, indexedInterval, 
metadataStorageTablesConfig);
-              }
-
-              final List<DataSegment> finalSegmentsToPublish = 
Lists.newArrayList();
-              for (DataSegment segment : segments) {
-                List<TimelineObjectHolder<String, DataSegment>> existingChunks 
= timeline
-                        .lookup(segment.getInterval());
-                if (existingChunks.size() > 1) {
-                  // Not possible to expand since we have more than one chunk 
with a single segment.
-                  // This is the case when user wants to append a segment with 
coarser granularity.
-                  // e.g If metadata storage already has segments for with 
granularity HOUR and segments to append have DAY granularity.
-                  // Druid shard specs does not support multiple partitions 
for same interval with different granularity.
-                  throw new IllegalStateException(
-                          String.format(
-                                  "Cannot allocate new segment for 
dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append 
new segment.",
-                                  dataSource,
-                                  segment.getInterval(),
-                                  existingChunks.size()
-                          )
-                  );
-                }
-                // Find out the segment with latest version and maximum 
partition number
-                SegmentIdentifier max = null;
-                final ShardSpec newShardSpec;
-                final String newVersion;
-                if (!existingChunks.isEmpty()) {
-                  // Some existing chunk, Find max
-                  TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables
-                          .getOnlyElement(existingChunks);
-                  for (PartitionChunk<DataSegment> existing : 
existingHolder.getObject()) {
-                    if (max == null ||
-                            max.getShardSpec().getPartitionNum() < 
existing.getObject()
-                                    .getShardSpec()
-                                    .getPartitionNum()) {
-                      max = 
SegmentIdentifier.fromDataSegment(existing.getObject());
-                    }
-                  }
-                }
-
-                if (max == null) {
-                  // No existing shard present in the database, use the 
current version.
-                  newShardSpec = segment.getShardSpec();
-                  newVersion = segment.getVersion();
-                } else {
-                  // use version of existing max segment to generate new shard 
spec
-                  newShardSpec = getNextPartitionShardSpec(max.getShardSpec());
-                  newVersion = max.getVersion();
-                }
-                DataSegment publishedSegment = publishSegmentWithShardSpec(
-                        segment,
-                        newShardSpec,
-                        newVersion,
-                        getPath(segment).getFileSystem(conf),
-                        dataSegmentPusher
-                );
-                finalSegmentsToPublish.add(publishedSegment);
-                timeline.add(
-                        publishedSegment.getInterval(),
-                        publishedSegment.getVersion(),
-                        
publishedSegment.getShardSpec().createChunk(publishedSegment)
-                );
-
-              }
-
-              // Publish new segments to metadata storage
-              final PreparedBatch batch = handle.prepareBatch(
-                      String.format(
-                              "INSERT INTO %1$s (id, dataSource, created_date, 
start, \"end\", partitioned, version, used, payload) "
-                                      + "VALUES (:id, :dataSource, 
:created_date, :start, :end, :partitioned, :version, :used, :payload)",
-                              metadataStorageTablesConfig.getSegmentsTable()
-                      )
-
-              );
-
-              for (final DataSegment segment : finalSegmentsToPublish) {
-
-                batch.add(
-                        new ImmutableMap.Builder<String, Object>()
-                                .put("id", segment.getIdentifier())
-                                .put("dataSource", segment.getDataSource())
-                                .put("created_date", new DateTime().toString())
-                                .put("start", 
segment.getInterval().getStart().toString())
-                                .put("end", 
segment.getInterval().getEnd().toString())
-                                .put("partitioned",
-                                        (segment.getShardSpec() instanceof 
NoneShardSpec) ?
-                                                false :
-                                                true
-                                )
-                                .put("version", segment.getVersion())
-                                .put("used", true)
-                                .put("payload", 
JSON_MAPPER.writeValueAsBytes(segment))
-                                .build()
-                );
-
-                LOG.info("Published {}", segment.getIdentifier());
-              }
-              batch.execute();
-
-              return finalSegmentsToPublish;
+  @SuppressWarnings("unchecked") static List<DataSegment> 
publishSegmentsAndCommit(final SQLMetadataConnector connector,
+      final MetadataStorageTablesConfig metadataStorageTablesConfig,
+      final String dataSource,
+      final List<DataSegment> segments,
+      boolean overwrite,
+      Configuration conf,
+      DataSegmentPusher dataSegmentPusher) throws CallbackFailedException {
+    return connector.getDBI().inTransaction((handle, transactionStatus) -> {
+      // We create the timeline for the existing and new segments
+      VersionedIntervalTimeline<String, DataSegment> timeline;
+      if (overwrite) {
+        // If we are overwriting, we disable existing sources
+        disableDataSourceWithHandle(handle, metadataStorageTablesConfig, 
dataSource);
+
+        // When overwriting, we just start with empty timeline,
+        // as we are overwriting segments with new versions
+        timeline = new VersionedIntervalTimeline<>(Ordering.natural());
+      } else {
+        // Append Mode
+        if (segments.isEmpty()) {
+          // If there are no new segments, we can just bail out
+          return Collections.EMPTY_LIST;
+        }
+        // Otherwise, build a timeline of existing segments in metadata storage
+        Interval
+            indexedInterval =
+            
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+        LOG.info("Building timeline for umbrella Interval [{}]", 
indexedInterval);
+        timeline = getTimelineForIntervalWithHandle(handle, dataSource, 
indexedInterval, metadataStorageTablesConfig);
+      }
+
+      final List<DataSegment> finalSegmentsToPublish = Lists.newArrayList();
+      for (DataSegment segment : segments) {
+        List<TimelineObjectHolder<String, DataSegment>> existingChunks = 
timeline.lookup(segment.getInterval());
+        if (existingChunks.size() > 1) {
+          // Not possible to expand since we have more than one chunk with a 
single segment.
+          // This is the case when user wants to append a segment with coarser 
granularity.
+          // e.g If metadata storage already has segments for with granularity 
HOUR and segments to append have DAY granularity.
+          // Druid shard specs does not support multiple partitions for same 
interval with different granularity.
+          throw new IllegalStateException(String.format(
+              "Cannot allocate new segment for dataSource[%s], interval[%s], 
already have [%,d] chunks. "
+                  + "Not possible to append new segment.",
+              dataSource,
+              segment.getInterval(),
+              existingChunks.size()));
+        }
+        // Find out the segment with latest version and maximum partition 
number
+        SegmentIdentifier max = null;
+        final ShardSpec newShardSpec;
+        final String newVersion;
+        if (!existingChunks.isEmpty()) {
+          // Some existing chunk, Find max
+          TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
+          for (PartitionChunk<DataSegment> existing : 
existingHolder.getObject()) {
+            if (max == null || max.getShardSpec().getPartitionNum() < 
existing.getObject()
+                .getShardSpec()
+                .getPartitionNum()) {
+              max = SegmentIdentifier.fromDataSegment(existing.getObject());
             }
-    );
+          }
+        }
+
+        if (max == null) {
+          // No existing shard present in the database, use the current 
version.
+          newShardSpec = segment.getShardSpec();
+          newVersion = segment.getVersion();
+        } else {
+          // use version of existing max segment to generate new shard spec
+          newShardSpec = getNextPartitionShardSpec(max.getShardSpec());
+          newVersion = max.getVersion();
+        }
+        DataSegment
+            publishedSegment =
+            publishSegmentWithShardSpec(segment,
+                newShardSpec,
+                newVersion,
+                getPath(segment).getFileSystem(conf),
+                dataSegmentPusher);
+        finalSegmentsToPublish.add(publishedSegment);
+        timeline.add(publishedSegment.getInterval(),
+            publishedSegment.getVersion(),
+            publishedSegment.getShardSpec().createChunk(publishedSegment));
+
+      }
+
+      // Publish new segments to metadata storage
+      final PreparedBatch
+          batch =
+          handle.prepareBatch(String.format(
+              "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", 
partitioned, version, used, payload) "
+                  + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version, :used, :payload)",
+              metadataStorageTablesConfig.getSegmentsTable())
+
+          );
+
+      for (final DataSegment segment : finalSegmentsToPublish) {
+
+        batch.add(new ImmutableMap.Builder<String, Object>().put("id", 
segment.getIdentifier())
+            .put("dataSource", segment.getDataSource())
+            .put("created_date", new DateTime().toString())
+            .put("start", segment.getInterval().getStart().toString())
+            .put("end", segment.getInterval().getEnd().toString())
+            .put("partitioned", !(segment.getShardSpec() instanceof 
NoneShardSpec))
+            .put("version", segment.getVersion())
+            .put("used", true)
+            .put("payload", JSON_MAPPER.writeValueAsBytes(segment))
+            .build());
+
+        LOG.info("Published {}", segment.getIdentifier());
+      }
+      batch.execute();
+
+      return finalSegmentsToPublish;
+    });
   }
 
-  public static void disableDataSourceWithHandle(Handle handle,
-          MetadataStorageTablesConfig metadataStorageTablesConfig, String 
dataSource
-  ) {
-    handle.createStatement(
-            String.format("UPDATE %s SET used=false WHERE dataSource = 
:dataSource",
-                    metadataStorageTablesConfig.getSegmentsTable()
-            )
-    )
-            .bind("dataSource", dataSource)
-            .execute();
+  private static void disableDataSourceWithHandle(Handle handle,
+      MetadataStorageTablesConfig metadataStorageTablesConfig,
+      String dataSource) {
+    handle.createStatement(String.format("UPDATE %s SET used=false WHERE 
dataSource = :dataSource",
+        metadataStorageTablesConfig.getSegmentsTable())).bind("dataSource", 
dataSource).execute();
   }
 
   /**
@@ -612,41 +564,29 @@ public final class DruidStorageHandlerUtils {
    *
    * @return List of all data segments part of the given data source
    */
-  public static List<DataSegment> getDataSegmentList(final 
SQLMetadataConnector connector,
-          final MetadataStorageTablesConfig metadataStorageTablesConfig, final 
String dataSource
-  ) {
-    List<DataSegment> segmentList = connector.retryTransaction(
-            (handle, status) -> handle
-                    .createQuery(String.format(
-                            "SELECT payload FROM %s WHERE dataSource = 
:dataSource",
-                            metadataStorageTablesConfig.getSegmentsTable()
-                    ))
-                    .setFetchSize(getStreamingFetchSize(connector))
-                    .bind("dataSource", dataSource)
-                    .map(ByteArrayMapper.FIRST)
-                    .fold(
-                            new ArrayList<>(),
-                            (Folder3<List<DataSegment>, byte[]>) (accumulator, 
payload, control, ctx) -> {
-                              try {
-                                final DataSegment segment = 
DATA_SEGMENT_INTERNER.intern(
-                                        JSON_MAPPER.readValue(
-                                                payload,
-                                                DataSegment.class
-                                        ));
-
-                                accumulator.add(segment);
-                                return accumulator;
-                              } catch (Exception e) {
-                                throw new SQLException(e.toString());
-                              }
-                            }
-                    )
-            , 3, DEFAULT_MAX_TRIES);
-    return segmentList;
+  static List<DataSegment> getDataSegmentList(final SQLMetadataConnector 
connector,
+      final MetadataStorageTablesConfig metadataStorageTablesConfig,
+      final String dataSource) {
+    return connector.retryTransaction((handle, status) -> 
handle.createQuery(String.format(
+        "SELECT payload FROM %s WHERE dataSource = :dataSource",
+        metadataStorageTablesConfig.getSegmentsTable()))
+        .setFetchSize(getStreamingFetchSize(connector))
+        .bind("dataSource", dataSource)
+        .map(ByteArrayMapper.FIRST)
+        .fold(new ArrayList<>(), (Folder3<List<DataSegment>, byte[]>) 
(accumulator, payload, control, ctx) -> {
+          try {
+            final DataSegment segment = 
DATA_SEGMENT_INTERNER.intern(JSON_MAPPER.readValue(payload, DataSegment.class));
+
+            accumulator.add(segment);
+            return accumulator;
+          } catch (Exception e) {
+            throw new SQLException(e.toString());
+          }
+        }), 3, DEFAULT_MAX_TRIES);
   }
 
   /**
-   * @param connector
+   * @param connector SQL DBI connector.
    *
    * @return streaming fetch size.
    */
@@ -658,112 +598,150 @@ public final class DruidStorageHandlerUtils {
   }
 
   /**
-   * @param pushedSegment
-   * @param segmentsDescriptorDir
+   * @param pushedSegment the pushed data segment object
+   * @param segmentsDescriptorDir actual directory path for descriptors.
    *
    * @return a sanitize file name
    */
-  public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment,
-          Path segmentsDescriptorDir
-  ) {
-    return new Path(
-            segmentsDescriptorDir,
-            String.format("%s.json", 
pushedSegment.getIdentifier().replace(":", ""))
-    );
+  public static Path makeSegmentDescriptorOutputPath(DataSegment 
pushedSegment, Path segmentsDescriptorDir) {
+    return new Path(segmentsDescriptorDir, String.format("%s.json", 
pushedSegment.getIdentifier().replace(":", "")));
   }
 
   public static String createScanAllQuery(String dataSourceName, List<String> 
columns) throws JsonProcessingException {
     final ScanQuery.ScanQueryBuilder scanQueryBuilder = 
ScanQuery.newScanQueryBuilder();
-    final List<Interval> intervals = Arrays.asList(DEFAULT_INTERVAL);
-    ScanQuery scanQuery = scanQueryBuilder
-        .dataSource(dataSourceName)
-        .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
-        .intervals(new MultipleIntervalSegmentSpec(intervals))
-        .columns(columns)
-        .build();
+    final List<Interval> intervals = 
Collections.singletonList(DEFAULT_INTERVAL);
+    ScanQuery
+        scanQuery =
+        scanQueryBuilder.dataSource(dataSourceName)
+            .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
+            .intervals(new MultipleIntervalSegmentSpec(intervals))
+            .columns(columns)
+            .build();
     return JSON_MAPPER.writeValueAsString(scanQuery);
   }
+
+  @Nullable static Boolean getBooleanProperty(Table table, String 
propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    return Boolean.parseBoolean(val);
+  }
+
+  @Nullable static Integer getIntegerProperty(Table table, String 
propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Integer.parseInt(val);
+    } catch (NumberFormatException e) {
+      throw new NumberFormatException(String.format("Exception while parsing 
property[%s] with Value [%s] as Integer",
+          propertyName,
+          val));
+    }
+  }
+
+  @Nullable static Long getLongProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Long.parseLong(val);
+    } catch (NumberFormatException e) {
+      throw new NumberFormatException(String.format("Exception while parsing 
property[%s] with Value [%s] as Long",
+          propertyName,
+          val));
+    }
+  }
+
+  @Nullable static Period getPeriodProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Period.parse(val);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format("Exception while 
parsing property[%s] with Value [%s] as Period",
+          propertyName,
+          val));
+    }
+  }
+
+  static String getTableProperty(Table table, String propertyName) {
+    return table.getParameters().get(propertyName);
+  }
+
   /**
-   * Simple interface for retry operations
+   * Simple interface for retry operations.
    */
   public interface DataPusher {
-    long push() throws IOException;
+    void push() throws IOException;
   }
 
   // Thanks, HBase Storage handler
-  public static void addDependencyJars(Configuration conf, Class<?>... 
classes) throws IOException {
+  @SuppressWarnings("SameParameterValue") static void 
addDependencyJars(Configuration conf, Class<?>... classes)
+      throws IOException {
     FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    jars.addAll(conf.getStringCollection("tmpjars"));
+    Set<String> jars = new HashSet<>(conf.getStringCollection("tmpjars"));
     for (Class<?> clazz : classes) {
       if (clazz == null) {
         continue;
       }
-      String path = Utilities.jarFinderGetJar(clazz);
+      final String path = Utilities.jarFinderGetJar(clazz);
       if (path == null) {
-        throw new RuntimeException(
-                "Could not find jar for class " + clazz + " in order to ship 
it to the cluster.");
+        throw new RuntimeException("Could not find jar for class " + clazz + " 
in order to ship it to the cluster.");
       }
       if (!localFs.exists(new Path(path))) {
         throw new RuntimeException("Could not validate jar file " + path + " 
for class " + clazz);
       }
-      jars.add(path.toString());
+      jars.add(path);
     }
     if (jars.isEmpty()) {
       return;
     }
+    //noinspection ToArrayCallWithZeroLengthArrayArgument
     conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new 
String[jars.size()])));
   }
 
-  private static VersionedIntervalTimeline<String, DataSegment> 
getTimelineForIntervalWithHandle(
-          final Handle handle,
-          final String dataSource,
-          final Interval interval,
-          final MetadataStorageTablesConfig dbTables
-  ) throws IOException {
-    Query<Map<String, Object>> sql = handle.createQuery(
-            String.format(
-                    "SELECT payload FROM %s WHERE used = true AND dataSource = 
? AND start <= ? AND \"end\" >= ?",
-                    dbTables.getSegmentsTable()
-            )
-    ).bind(0, dataSource)
+  private static VersionedIntervalTimeline<String, DataSegment> 
getTimelineForIntervalWithHandle(final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final MetadataStorageTablesConfig dbTables) throws IOException {
+    Query<Map<String, Object>>
+        sql =
+        handle.createQuery(String.format(
+            "SELECT payload FROM %s WHERE used = true AND dataSource = ? AND 
start <= ? AND \"end\" >= ?",
+            dbTables.getSegmentsTable()))
+            .bind(0, dataSource)
             .bind(1, interval.getEnd().toString())
             .bind(2, interval.getStart().toString());
 
-    final VersionedIntervalTimeline<String, DataSegment> timeline = new 
VersionedIntervalTimeline<>(
-            Ordering.natural()
-    );
-    final ResultIterator<byte[]> dbSegments = sql
-            .map(ByteArrayMapper.FIRST)
-            .iterator();
-    try {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = new 
VersionedIntervalTimeline<>(Ordering.natural());
+    try (ResultIterator<byte[]> dbSegments = 
sql.map(ByteArrayMapper.FIRST).iterator()) {
       while (dbSegments.hasNext()) {
         final byte[] payload = dbSegments.next();
-        DataSegment segment = JSON_MAPPER.readValue(
-                payload,
-                DataSegment.class
-        );
-        timeline.add(segment.getInterval(), segment.getVersion(),
-                segment.getShardSpec().createChunk(segment)
-        );
+        DataSegment segment = JSON_MAPPER.readValue(payload, 
DataSegment.class);
+        timeline.add(segment.getInterval(), segment.getVersion(), 
segment.getShardSpec().createChunk(segment));
       }
-    } finally {
-      dbSegments.close();
     }
     return timeline;
   }
 
-  public static DataSegmentPusher createSegmentPusherForDirectory(String 
segmentDirectory,
-          Configuration configuration) throws IOException {
+  public static DataSegmentPusher createSegmentPusherForDirectory(String 
segmentDirectory, Configuration configuration)
+      throws IOException {
     final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new 
HdfsDataSegmentPusherConfig();
     hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory);
-    return new HdfsDataSegmentPusher(
-            hdfsDataSegmentPusherConfig, configuration, JSON_MAPPER);
+    return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, 
configuration, JSON_MAPPER);
   }
 
-  public static DataSegment publishSegmentWithShardSpec(DataSegment segment, 
ShardSpec shardSpec,
-          String version, FileSystem fs, DataSegmentPusher dataSegmentPusher
-  ) throws IOException {
+  private static DataSegment publishSegmentWithShardSpec(DataSegment segment,
+      ShardSpec shardSpec,
+      String version,
+      FileSystem fs,
+      DataSegmentPusher dataSegmentPusher) throws IOException {
     boolean retry = true;
     DataSegment.Builder dataSegmentBuilder = new 
DataSegment.Builder(segment).version(version);
     Path finalPath = null;
@@ -772,8 +750,9 @@ public final class DruidStorageHandlerUtils {
       dataSegmentBuilder.shardSpec(shardSpec);
       final Path intermediatePath = getPath(segment);
 
-      finalPath = new Path(dataSegmentPusher.getPathForHadoop(), 
dataSegmentPusher
-              .makeIndexPathName(dataSegmentBuilder.build(), 
DruidStorageHandlerUtils.INDEX_ZIP));
+      finalPath =
+          new Path(dataSegmentPusher.getPathForHadoop(),
+              dataSegmentPusher.makeIndexPathName(dataSegmentBuilder.build(), 
DruidStorageHandlerUtils.INDEX_ZIP));
       // Create parent if it does not exist, recreation is not an error
       fs.mkdirs(finalPath.getParent());
 
@@ -784,16 +763,13 @@ public final class DruidStorageHandlerUtils {
           retry = true;
         } else {
           throw new IOException(String.format(
-                  "Failed to rename intermediate segment[%s] to final 
segment[%s] is not present.",
-                  intermediatePath,
-                  finalPath
-          ));
+              "Failed to rename intermediate segment[%s] to final segment[%s] 
is not present.",
+              intermediatePath,
+              finalPath));
         }
       }
     }
-    DataSegment dataSegment = dataSegmentBuilder
-            .loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri()))
-            .build();
+    DataSegment dataSegment = 
dataSegmentBuilder.loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri())).build();
 
     writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), 
DruidStorageHandlerUtils.DESCRIPTOR_JSON));
 
@@ -804,61 +780,58 @@ public final class DruidStorageHandlerUtils {
     if (shardSpec instanceof LinearShardSpec) {
       return new LinearShardSpec(shardSpec.getPartitionNum() + 1);
     } else if (shardSpec instanceof NumberedShardSpec) {
-      return new NumberedShardSpec(shardSpec.getPartitionNum(),
-              ((NumberedShardSpec) shardSpec).getPartitions()
-      );
+      return new NumberedShardSpec(shardSpec.getPartitionNum(), 
((NumberedShardSpec) shardSpec).getPartitions());
     } else {
       // Druid only support appending more partitions to Linear and Numbered 
ShardSpecs.
-      throw new IllegalStateException(
-              String.format(
-                      "Cannot expand shard spec [%s]",
-                      shardSpec
-              )
-      );
+      throw new IllegalStateException(String.format("Cannot expand shard spec 
[%s]", shardSpec));
     }
   }
 
-  public static Path getPath(DataSegment dataSegment) {
-    return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
+  static Path getPath(DataSegment dataSegment) {
+    return new 
Path(String.valueOf(Objects.requireNonNull(dataSegment.getLoadSpec()).get("path")));
   }
 
   public static GranularitySpec getGranularitySpec(Configuration 
configuration, Properties tableProperties) {
-    final String segmentGranularity =
+    final String
+        segmentGranularity =
         tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != 
null ?
             tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
             HiveConf.getVar(configuration, 
HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
-    final boolean rollup = tableProperties.getProperty(Constants.DRUID_ROLLUP) 
!= null ?
-        
Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY)):
-        HiveConf.getBoolVar(configuration, 
HiveConf.ConfVars.HIVE_DRUID_ROLLUP);
-    return new UniformGranularitySpec(
-        Granularity.fromString(segmentGranularity),
-        Granularity.fromString(
-            tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == 
null
-                ? "NONE"
-                : 
tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)),
+    final boolean
+        rollup =
+        tableProperties.getProperty(DRUID_ROLLUP) != null ?
+            
Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY))
 :
+            HiveConf.getBoolVar(configuration, 
HiveConf.ConfVars.HIVE_DRUID_ROLLUP);
+    return new 
UniformGranularitySpec(Granularity.fromString(segmentGranularity),
+        
Granularity.fromString(tableProperties.getProperty(DRUID_QUERY_GRANULARITY) == 
null ?
+            "NONE" :
+            tableProperties.getProperty(DRUID_QUERY_GRANULARITY)),
         rollup,
-        null
-    );
+        null);
   }
 
   public static IndexSpec getIndexSpec(Configuration jc) {
-    IndexSpec indexSpec;
+    final BitmapSerdeFactory bitmapSerdeFactory;
     if ("concise".equals(HiveConf.getVar(jc, 
HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) {
-      indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, 
null);
+      bitmapSerdeFactory = new ConciseBitmapSerdeFactory();
     } else {
-      indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, 
null, null);
+      bitmapSerdeFactory = new RoaringBitmapSerdeFactory(true);
     }
-    return indexSpec;
+    return new IndexSpec(bitmapSerdeFactory,
+        IndexSpec.DEFAULT_DIMENSION_COMPRESSION,
+        IndexSpec.DEFAULT_METRIC_COMPRESSION,
+        IndexSpec.DEFAULT_LONG_ENCODING);
   }
 
-  public static Pair<List<DimensionSchema>, AggregatorFactory[]> 
getDimensionsAndAggregates(Configuration jc, List<String> columnNames,
+  public static Pair<List<DimensionSchema>, AggregatorFactory[]> 
getDimensionsAndAggregates(List<String> columnNames,
       List<TypeInfo> columnTypes) {
     // Default, all columns that are not metrics or timestamp, are treated as 
dimensions
     final List<DimensionSchema> dimensions = new ArrayList<>();
     ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = 
ImmutableList.builder();
     for (int i = 0; i < columnTypes.size(); i++) {
-      final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = 
((PrimitiveTypeInfo) columnTypes
-          .get(i)).getPrimitiveCategory();
+      final PrimitiveObjectInspector.PrimitiveCategory
+          primitiveCategory =
+          ((PrimitiveTypeInfo) columnTypes.get(i)).getPrimitiveCategory();
       AggregatorFactory af;
       switch (primitiveCategory) {
       case BYTE:
@@ -874,38 +847,39 @@ public final class DruidStorageHandlerUtils {
         af = new DoubleSumAggregatorFactory(columnNames.get(i), 
columnNames.get(i));
         break;
       case DECIMAL:
-        throw new UnsupportedOperationException(
-            String.format("Druid does not support decimal column type cast 
column "
-                + "[%s] to double", columnNames.get(i)));
-
+        throw new UnsupportedOperationException(String.format("Druid does not 
support decimal column type cast column "
+            + "[%s] to double", columnNames.get(i)));
       case TIMESTAMP:
         // Granularity column
         String tColumnName = columnNames.get(i);
-        if 
(!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) &&
-            
!tColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
-          throw new IllegalArgumentException(
-              "Dimension " + tColumnName + " does not have STRING type: " +
-                  primitiveCategory);
+        if 
(!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && 
!tColumnName.equals(
+            DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+          throw new IllegalArgumentException("Dimension "
+              + tColumnName
+              + " does not have STRING type: "
+              + primitiveCategory);
         }
         continue;
       case TIMESTAMPLOCALTZ:
         // Druid timestamp column
         String tLocalTZColumnName = columnNames.get(i);
         if 
(!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) 
{
-          throw new IllegalArgumentException(
-              "Dimension " + tLocalTZColumnName + " does not have STRING type: 
" +
-                  primitiveCategory);
+          throw new IllegalArgumentException("Dimension "
+              + tLocalTZColumnName
+              + " does not have STRING type: "
+              + primitiveCategory);
         }
         continue;
       default:
         // Dimension
         String dColumnName = columnNames.get(i);
-        if 
(PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) !=
-            PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP
+        if 
(PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory)
+            != PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP
             && primitiveCategory != 
PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) {
-          throw new IllegalArgumentException(
-              "Dimension " + dColumnName + " does not have STRING type: " +
-                  primitiveCategory);
+          throw new IllegalArgumentException("Dimension "
+              + dColumnName
+              + " does not have STRING type: "
+              + primitiveCategory);
         }
         dimensions.add(new StringDimensionSchema(dColumnName));
         continue;
@@ -913,7 +887,6 @@ public final class DruidStorageHandlerUtils {
       aggregatorFactoryBuilder.add(af);
     }
     ImmutableList<AggregatorFactory> aggregatorFactories = 
aggregatorFactoryBuilder.build();
-    return Pair.of(dimensions,
-        aggregatorFactories.toArray(new 
AggregatorFactory[aggregatorFactories.size()]));
+    return Pair.of(dimensions, aggregatorFactories.toArray(new 
AggregatorFactory[0]));
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index ecb4360..862d7ca 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -6,17 +6,19 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.druid.io;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -64,9 +67,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, 
DruidWritable> {
+/**
+ * Druid Output format class used to write data as Native Druid Segment.
+ */
+public class DruidOutputFormat implements HiveOutputFormat<NullWritable, 
DruidWritable> {
 
-  protected static final Logger LOG = 
LoggerFactory.getLogger(DruidOutputFormat.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(DruidOutputFormat.class);
 
   @Override
   public FileSinkOperator.RecordWriter getHiveRecordWriter(
@@ -89,7 +95,7 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
     final String dataSource = 
tableProperties.getProperty(Constants.DRUID_DATA_SOURCE) == null
         ? jc.get(Constants.DRUID_DATA_SOURCE)
         : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
-    final String segmentDirectory = 
jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY);
+    final String segmentDirectory = 
jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY);
 
     final GranularitySpec granularitySpec = 
DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties);
 
@@ -111,7 +117,7 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
     ArrayList<TypeInfo> columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
 
     Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = 
DruidStorageHandlerUtils
-        .getDimensionsAndAggregates(jc, columnNames, columnTypes);
+        .getDimensionsAndAggregates(columnNames, columnTypes);
     final InputRowParser inputRowParser = new MapInputRowParser(new 
TimeAndDimsParseSpec(
             new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
             new DimensionsSpec(dimensionsAndAggregates.lhs, Lists
@@ -121,8 +127,10 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
             )
     ));
 
-    Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
-            .convertValue(inputRowParser, Map.class);
+    Map<String, Object>
+        inputParser =
+        DruidStorageHandlerUtils.JSON_MAPPER.convertValue(inputRowParser, new 
TypeReference<Map<String, Object>>() {
+        });
 
     final DataSchema dataSchema = new DataSchema(
             Preconditions.checkNotNull(dataSource, "Data source name is null"),
@@ -133,8 +141,8 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
             DruidStorageHandlerUtils.JSON_MAPPER
     );
 
-    final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
-    final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
+    final String workingPath = 
jc.get(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY);
+    final String version = 
jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION);
     String basePersistDirectory = HiveConf
             .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
     if (Strings.isNullOrEmpty(basePersistDirectory)) {
@@ -170,7 +178,7 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
   }
 
   @Override
-  public RecordWriter<K, DruidWritable> getRecordWriter(
+  public RecordWriter<NullWritable, DruidWritable> getRecordWriter(
           FileSystem ignored, JobConf job, String name, Progressable progress
   ) throws IOException {
     throw new UnsupportedOperationException("please implement me !");

http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 1c989c1..ff766c4 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -59,6 +59,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
@@ -77,7 +78,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
 
-  public static DruidQueryRecordReader getDruidQueryReader(String 
druidQueryType) {
+  @Nullable public static DruidQueryRecordReader getDruidQueryReader(String 
druidQueryType) {
     switch (druidQueryType) {
     case Query.TIMESERIES:
       return new DruidTimeseriesQueryRecordReader();
@@ -89,8 +90,9 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
       return new DruidSelectQueryRecordReader();
     case Query.SCAN:
       return new DruidScanQueryRecordReader();
+    default:
+      return null;
     }
-    return null;
   }
 
   @Override
@@ -101,7 +103,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
 
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
-    return 
Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration()));
+    return Arrays.asList(getInputSplits(context.getConfiguration()));
   }
 
   @SuppressWarnings("deprecation")
@@ -150,39 +152,35 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     // Druid query with user timezone, as this is default Hive time semantics.
     // Then, create splits with the Druid queries.
     switch (druidQueryType) {
-      case Query.TIMESERIES:
-      case Query.TOPN:
-      case Query.GROUP_BY:
-        return new HiveDruidSplit[] {
-            new HiveDruidSplit(druidQuery, paths[0], new String[] { address }) 
};
-      case Query.SELECT:
-        SelectQuery selectQuery = 
DruidStorageHandlerUtils.JSON_MAPPER.readValue(
-                druidQuery, SelectQuery.class);
-        return distributeSelectQuery(conf, address, selectQuery, paths[0]);
-      case Query.SCAN:
-        ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(
-            druidQuery, ScanQuery.class);
-        return distributeScanQuery(conf, address, scanQuery, paths[0]);
+    case Query.TIMESERIES:
+    case Query.TOPN:
+    case Query.GROUP_BY:
+      return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], 
new String[] {address})};
+    case Query.SELECT:
+      SelectQuery selectQuery = 
DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+      return distributeSelectQuery(address, selectQuery, paths[0]);
+    case Query.SCAN:
+      ScanQuery scanQuery = 
DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class);
+      return distributeScanQuery(address, scanQuery, paths[0]);
     default:
-        throw new IOException("Druid query type not recognized");
+      throw new IOException("Druid query type not recognized");
     }
   }
 
   /* New method that distributes the Select query by creating splits containing
    * information about different Druid nodes that have the data for the given
    * query. */
-  private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, 
String address,
-      SelectQuery query, Path dummyPath) throws IOException {
+  private static HiveDruidSplit[] distributeSelectQuery(String address, 
SelectQuery query, Path dummyPath)
+      throws IOException {
     // If it has a limit, we use it and we do not distribute the query
-    final boolean isFetch = 
query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    final boolean isFetch = 
query.getContextBoolean(DruidStorageHandlerUtils.DRUID_QUERY_FETCH, false);
     if (isFetch) {
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), 
dummyPath,
-              new String[]{address} ) };
+      return new HiveDruidSplit[] {new 
HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
+          dummyPath,
+          new String[] {address})};
     }
 
-    final List<LocatedSegmentDescriptor> segmentDescriptors = 
fetchLocatedSegmentDescriptors(
-        address, query);
+    final List<LocatedSegmentDescriptor> segmentDescriptors = 
fetchLocatedSegmentDescriptors(address, query);
 
     // Create one input split for each segment
     final int numSplits = segmentDescriptors.size();
@@ -211,14 +209,15 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
   /* New method that distributes the Scan query by creating splits containing
    * information about different Druid nodes that have the data for the given
    * query. */
-  private static HiveDruidSplit[] distributeScanQuery(Configuration conf, 
String address,
-      ScanQuery query, Path dummyPath) throws IOException {
+  private static HiveDruidSplit[] distributeScanQuery(String address, 
ScanQuery query, Path dummyPath)
+      throws IOException {
     // If it has a limit, we use it and we do not distribute the query
     final boolean isFetch = query.getLimit() < Long.MAX_VALUE;
     if (isFetch) {
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-          DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), 
dummyPath,
-          new String[]{address} ) };
+      return new HiveDruidSplit[] {new 
HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
+          dummyPath,
+          new String[] {address})
+      };
     }
 
     final List<LocatedSegmentDescriptor> segmentDescriptors = 
fetchLocatedSegmentDescriptors(
@@ -288,7 +287,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
           throws IOException {
     // We need to provide a different record reader for every type of Druid 
query.
     // The reason is that Druid results format is different for each type.
-    final DruidQueryRecordReader<?, ?> reader;
+    final DruidQueryRecordReader<?> reader;
     final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
     if (druidQueryType == null) {
       reader = new DruidScanQueryRecordReader(); // By default we use scan 
query as fallback.
@@ -314,7 +313,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     if (druidQueryType == null) {
       return new DruidScanQueryRecordReader(); // By default, we use druid 
scan query as fallback.
     }
-    final DruidQueryRecordReader<?, ?> reader =
+    final DruidQueryRecordReader<?> reader =
             getDruidQueryReader(druidQueryType);
     if (reader == null) {
       throw new IOException("Druid query type " + druidQueryType + " not 
recognized");

http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 400262a..671b8cf 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -17,13 +17,10 @@
  */
 package org.apache.hadoop.hive.druid.io;
 
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.druid.data.input.Committer;
@@ -49,6 +46,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -58,16 +56,20 @@ import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
-public class DruidRecordWriter implements RecordWriter<NullWritable, 
DruidWritable>,
-        org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+/**
+ * Druid Record Writer, implementing a file sink operator.
+ */
+public class DruidRecordWriter implements RecordWriter<NullWritable, 
DruidWritable>, FileSinkOperator.RecordWriter {
   protected static final Logger LOG = 
LoggerFactory.getLogger(DruidRecordWriter.class);
 
   private final DataSchema dataSchema;
@@ -88,36 +90,32 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
   private final Granularity segmentGranularity;
 
-  public DruidRecordWriter(
-          DataSchema dataSchema,
-          RealtimeTuningConfig realtimeTuningConfig,
-          DataSegmentPusher dataSegmentPusher,
-          int maxPartitionSize,
-          final Path segmentsDescriptorsDir,
-          final FileSystem fileSystem
-  ) {
-    File basePersistDir = new 
File(realtimeTuningConfig.getBasePersistDirectory(),
-            UUID.randomUUID().toString()
-    );
-    this.tuningConfig = Preconditions
-            
.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir),
-                    "realtimeTuningConfig is null"
-            );
+  public DruidRecordWriter(DataSchema dataSchema,
+      RealtimeTuningConfig realtimeTuningConfig,
+      DataSegmentPusher dataSegmentPusher,
+      int maxPartitionSize,
+      final Path segmentsDescriptorsDir,
+      final FileSystem fileSystem) {
+    File basePersistDir = new 
File(realtimeTuningConfig.getBasePersistDirectory(), 
UUID.randomUUID().toString());
+    this.tuningConfig =
+        
Preconditions.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir),
+            "realtimeTuningConfig is null");
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is 
null");
 
-    appenderator = Appenderators
-            .createOffline(this.dataSchema, tuningConfig, new 
FireDepartmentMetrics(),
-                    dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER,
-                    DruidStorageHandlerUtils.INDEX_IO, 
DruidStorageHandlerUtils.INDEX_MERGER_V9
-            );
+    appenderator =
+        Appenderators.createOffline(this.dataSchema,
+            tuningConfig,
+            new FireDepartmentMetrics(),
+            dataSegmentPusher,
+            DruidStorageHandlerUtils.JSON_MAPPER,
+            DruidStorageHandlerUtils.INDEX_IO,
+            DruidStorageHandlerUtils.INDEX_MERGER_V9);
     this.maxPartitionSize = maxPartitionSize;
     appenderator.startJob();
-    this.segmentsDescriptorDir = Preconditions
-            .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is 
null");
+    this.segmentsDescriptorDir = 
Preconditions.checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is 
null");
     this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is 
null");
-    this.segmentGranularity = this.dataSchema.getGranularitySpec()
-        .getSegmentGranularity();
-    committerSupplier = Suppliers.ofInstance(Committers.nil());
+    this.segmentGranularity = 
this.dataSchema.getGranularitySpec().getSegmentGranularity();
+    committerSupplier = Suppliers.ofInstance(Committers.nil())::get;
   }
 
   /**
@@ -132,19 +130,15 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
   private SegmentIdentifier getSegmentIdentifierAndMaybePush(long 
truncatedTime) {
 
     DateTime truncatedDateTime = 
segmentGranularity.bucketStart(DateTimes.utc(truncatedTime));
-      final Interval interval = new Interval(
-          truncatedDateTime,
-          segmentGranularity.increment(truncatedDateTime)
-      );
+    final Interval interval = new Interval(truncatedDateTime, 
segmentGranularity.increment(truncatedDateTime));
 
     SegmentIdentifier retVal;
     if (currentOpenSegment == null) {
-      currentOpenSegment = new SegmentIdentifier(
-              dataSchema.getDataSource(),
+      currentOpenSegment =
+          new SegmentIdentifier(dataSchema.getDataSource(),
               interval,
               tuningConfig.getVersioningPolicy().getVersion(interval),
-              new LinearShardSpec(0)
-      );
+              new LinearShardSpec(0));
       return currentOpenSegment;
     } else if (currentOpenSegment.getInterval().equals(interval)) {
       retVal = currentOpenSegment;
@@ -152,25 +146,24 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
       if (rowCount < maxPartitionSize) {
         return retVal;
       } else {
-        retVal = new SegmentIdentifier(
-                dataSchema.getDataSource(),
+        retVal =
+            new SegmentIdentifier(dataSchema.getDataSource(),
                 interval,
                 tuningConfig.getVersioningPolicy().getVersion(interval),
-                new 
LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
-        );
+                new 
LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1));
         pushSegments(Lists.newArrayList(currentOpenSegment));
         LOG.info("Creating new partition for segment {}, partition num {}",
-                retVal.getIdentifierAsString(), 
retVal.getShardSpec().getPartitionNum());
+            retVal.getIdentifierAsString(),
+            retVal.getShardSpec().getPartitionNum());
         currentOpenSegment = retVal;
         return retVal;
       }
     } else {
-      retVal = new SegmentIdentifier(
-              dataSchema.getDataSource(),
+      retVal =
+          new SegmentIdentifier(dataSchema.getDataSource(),
               interval,
               tuningConfig.getVersioningPolicy().getVersion(interval),
-              new LinearShardSpec(0)
-      );
+              new LinearShardSpec(0));
       pushSegments(Lists.newArrayList(currentOpenSegment));
       LOG.info("Creating segment {}", retVal.getIdentifierAsString());
       currentOpenSegment = retVal;
@@ -180,45 +173,30 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
   private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
     try {
-      SegmentsAndMetadata segmentsAndMetadata = appenderator
-              .push(segmentsToPush, committerSupplier.get(), false).get();
+      SegmentsAndMetadata segmentsAndMetadata = 
appenderator.push(segmentsToPush, committerSupplier.get(), false).get();
       final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>();
 
       for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
-        pushedSegmentIdentifierHashSet
-                
.add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
-        final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils
-                .makeSegmentDescriptorOutputPath(pushedSegment, 
segmentsDescriptorDir);
-        DruidStorageHandlerUtils
-                .writeSegmentDescriptor(fileSystem, pushedSegment, 
segmentDescriptorOutputPath);
-        LOG.info(
-                String.format(
-                        "Pushed the segment [%s] and persisted the descriptor 
located at [%s]",
-                        pushedSegment,
-                        segmentDescriptorOutputPath
-                )
-        );
+        
pushedSegmentIdentifierHashSet.add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
+        final Path
+            segmentDescriptorOutputPath =
+            
DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(pushedSegment, 
segmentsDescriptorDir);
+        DruidStorageHandlerUtils.writeSegmentDescriptor(fileSystem, 
pushedSegment, segmentDescriptorOutputPath);
+        LOG.info(String.format("Pushed the segment [%s] and persisted the 
descriptor located at [%s]",
+            pushedSegment,
+            segmentDescriptorOutputPath));
       }
 
-      final HashSet<String> toPushSegmentsHashSet = new HashSet(
-              FluentIterable.from(segmentsToPush)
-                      .transform(new Function<SegmentIdentifier, String>() {
-                        @Nullable
-                        @Override
-                        public String apply(
-                                @Nullable SegmentIdentifier input
-                        ) {
-                          return input.getIdentifierAsString();
-                        }
-                      })
-                      .toList());
+      final Set<String>
+          toPushSegmentsHashSet =
+          segmentsToPush.stream()
+              .map(SegmentIdentifier::getIdentifierAsString)
+              .collect(Collectors.toCollection(HashSet::new));
 
       if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
-        throw new IllegalStateException(String.format(
-                "was asked to publish [%s] but was able to publish only [%s]",
-                Joiner.on(", ").join(toPushSegmentsHashSet),
-                Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
-        ));
+        throw new IllegalStateException(String.format("was asked to publish 
[%s] but was able to publish only [%s]",
+            Joiner.on(", ").join(toPushSegmentsHashSet),
+            Joiner.on(", ").join(pushedSegmentIdentifierHashSet)));
       }
       for (SegmentIdentifier dataSegmentId : segmentsToPush) {
         LOG.info("Dropping segment {}", dataSegmentId.toString());
@@ -227,9 +205,7 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
       LOG.info(String.format("Published [%,d] segments.", 
segmentsToPush.size()));
     } catch (InterruptedException e) {
-      LOG.error(String.format("got interrupted, failed to push  [%,d] 
segments.",
-              segmentsToPush.size()
-      ), e);
+      LOG.error(String.format("got interrupted, failed to push  [%,d] 
segments.", segmentsToPush.size()), e);
       Thread.currentThread().interrupt();
     } catch (IOException | ExecutionException e) {
       LOG.error(String.format("Failed to push  [%,d] segments.", 
segmentsToPush.size()), e);
@@ -237,17 +213,17 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
     }
   }
 
-  @Override
-  public void write(Writable w) throws IOException {
+  @Override public void write(Writable w) throws IOException {
     DruidWritable record = (DruidWritable) w;
-    final long timestamp =
-        (long) 
record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN);
-    final int partitionNumber = Math.toIntExact(
-        (long) 
record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1l));
-    final InputRow inputRow = new MapBasedInputRow(timestamp,
-        
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
-        record.getValue()
-    );
+    final long timestamp = (long) 
record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN);
+    final int
+        partitionNumber =
+        Math.toIntExact((long) 
record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1L));
+    final InputRow
+        inputRow =
+        new MapBasedInputRow(timestamp,
+            
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
+            record.getValue());
 
     try {
 
@@ -258,38 +234,37 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
         Data with the same DRUID_SHARD_KEY_COL_NAME and Time interval will end 
in the same segment
         */
         DateTime truncatedDateTime = 
segmentGranularity.bucketStart(DateTimes.utc(timestamp));
-        final Interval interval = new Interval(
-            truncatedDateTime,
-            segmentGranularity.increment(truncatedDateTime)
-        );
+        final Interval interval = new Interval(truncatedDateTime, 
segmentGranularity.increment(truncatedDateTime));
 
         if (currentOpenSegment != null) {
           if (currentOpenSegment.getShardSpec().getPartitionNum() != 
partitionNumber
               || !currentOpenSegment.getInterval().equals(interval)) {
             pushSegments(ImmutableList.of(currentOpenSegment));
-            currentOpenSegment = new 
SegmentIdentifier(dataSchema.getDataSource(), interval,
-                tuningConfig.getVersioningPolicy().getVersion(interval),
-                new LinearShardSpec(partitionNumber)
-            );
+            currentOpenSegment =
+                new SegmentIdentifier(dataSchema.getDataSource(),
+                    interval,
+                    tuningConfig.getVersioningPolicy().getVersion(interval),
+                    new LinearShardSpec(partitionNumber));
           }
-        } else if (currentOpenSegment == null) {
-          currentOpenSegment = new 
SegmentIdentifier(dataSchema.getDataSource(), interval,
-              tuningConfig.getVersioningPolicy().getVersion(interval),
-              new LinearShardSpec(partitionNumber)
-          );
+        } else {
+          currentOpenSegment =
+              new SegmentIdentifier(dataSchema.getDataSource(),
+                  interval,
+                  tuningConfig.getVersioningPolicy().getVersion(interval),
+                  new LinearShardSpec(partitionNumber));
 
         }
-        appenderator.add(currentOpenSegment, inputRow, committerSupplier);
+        appenderator.add(currentOpenSegment, inputRow, committerSupplier::get);
 
       } else if (partitionNumber == -1 && maxPartitionSize != -1) {
         /*Case we are partitioning the segments based on time and max row per 
segment maxPartitionSize*/
-        appenderator
-            .add(getSegmentIdentifierAndMaybePush(timestamp), inputRow, 
committerSupplier);
+        appenderator.add(getSegmentIdentifierAndMaybePush(timestamp), 
inputRow, committerSupplier::get);
       } else {
         throw new IllegalArgumentException(String.format(
-            "partitionNumber and  maxPartitionSize should be mutually 
exclusive got partitionNum [%s] and maxPartitionSize [%s]",
-            partitionNumber, maxPartitionSize
-        ));
+            "partitionNumber and maxPartitionSize should be mutually exclusive 
"
+                + "got partitionNum [%s] and maxPartitionSize [%s]",
+            partitionNumber,
+            maxPartitionSize));
       }
 
     } catch (SegmentNotWritableException e) {
@@ -297,10 +272,9 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
     }
   }
 
-  @Override
-  public void close(boolean abort) throws IOException {
+  @Override public void close(boolean abort) throws IOException {
     try {
-      if (abort == false) {
+      if (!abort) {
         final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
         segmentsToPush.addAll(appenderator.getSegments());
         pushSegments(segmentsToPush);
@@ -311,20 +285,18 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
     } finally {
       try {
         FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
-      } catch (Exception e){
+      } catch (Exception e) {
         LOG.error("error cleaning of base persist directory", e);
       }
       appenderator.close();
     }
   }
 
-  @Override
-  public void write(NullWritable key, DruidWritable value) throws IOException {
+  @Override public void write(NullWritable key, DruidWritable value) throws 
IOException {
     this.write(value);
   }
 
-  @Override
-  public void close(Reporter reporter) throws IOException {
+  @Override public void close(Reporter reporter) throws IOException {
     this.close(false);
   }
 

Reply via email to