http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index c3e7e5d..8fcadea 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,7 +30,6 @@ import com.google.common.collect.Interners; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import com.google.common.io.CharStreams; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.jackson.DefaultObjectMapper; @@ -69,6 +68,7 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; +import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.indexing.granularity.GranularitySpec; @@ -92,6 +92,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; @@ -105,6 +106,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -117,37 +119,52 @@ import org.skife.jdbi.v2.util.ByteArrayMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.Reader; import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; /** * Utils class for Druid storage handler. */ public final class DruidStorageHandlerUtils { + private DruidStorageHandlerUtils () { + + } private static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandlerUtils.class); + private static final String DRUID_ROLLUP = "druid.rollup"; + private static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; + public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; + static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; + public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + static final String KAFKA_TOPIC = "kafka.topic"; + static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; + /* Kafka Ingestion state - valid values - START/STOP/RESET */ + static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; private static final int NUM_RETRIES = 8; private static final int SECONDS_BETWEEN_RETRIES = 2; private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB @@ -157,44 +174,44 @@ public final class DruidStorageHandlerUtils { public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; //Druid Json timestamp column name public static final String EVENT_TIMESTAMP_COLUMN = "timestamp"; - public static final String INDEX_ZIP = "index.zip"; - public static final String DESCRIPTOR_JSON = "descriptor.json"; - public static final Interval DEFAULT_INTERVAL = new Interval( - new DateTime("1900-01-01", ISOChronology.getInstanceUTC()), - new DateTime("3000-01-01", ISOChronology.getInstanceUTC()) - ).withChronology(ISOChronology.getInstanceUTC()); + static final String INDEX_ZIP = "index.zip"; + private static final String DESCRIPTOR_JSON = "descriptor.json"; + private static final Interval + DEFAULT_INTERVAL = + new Interval(new DateTime("1900-01-01", ISOChronology.getInstanceUTC()), + new DateTime("3000-01-01", ISOChronology.getInstanceUTC())).withChronology(ISOChronology.getInstanceUTC()); /** - * Mapper to use to serialize/deserialize Druid objects (JSON) + * Mapper to use to serialize/deserialize Druid objects (JSON). */ public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); /** - * Mapper to use to serialize/deserialize Druid objects (SMILE) + * Mapper to use to serialize/deserialize Druid objects (SMILE). */ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); private static final int DEFAULT_MAX_TRIES = 10; static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig - InjectableValues.Std injectableValues = new InjectableValues.Std() - .addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) - // Expressions macro table used when we deserialize the query from calcite plan - .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList - .of(new LikeExprMacro(), - new RegexpExtractExprMacro(), - new TimestampCeilExprMacro(), - new TimestampExtractExprMacro(), - new TimestampFormatExprMacro(), - new TimestampParseExprMacro(), - new TimestampShiftExprMacro(), - new TimestampFloorExprMacro(), - new TrimExprMacro.BothTrimExprMacro(), - new TrimExprMacro.LeftTrimExprMacro(), - new TrimExprMacro.RightTrimExprMacro() - ))) - .addValue(ObjectMapper.class, JSON_MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + InjectableValues.Std + injectableValues = + new InjectableValues.Std().addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) + // Expressions macro table used when we deserialize the query from calcite plan + .addValue(ExprMacroTable.class, + new ExprMacroTable(ImmutableList.of(new LikeExprMacro(), + new RegexpExtractExprMacro(), + new TimestampCeilExprMacro(), + new TimestampExtractExprMacro(), + new TimestampFormatExprMacro(), + new TimestampParseExprMacro(), + new TimestampShiftExprMacro(), + new TimestampFloorExprMacro(), + new TrimExprMacro.BothTrimExprMacro(), + new TrimExprMacro.LeftTrimExprMacro(), + new TrimExprMacro.RightTrimExprMacro()))) + .addValue(ObjectMapper.class, JSON_MAPPER) + .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); @@ -206,110 +223,96 @@ public final class DruidStorageHandlerUtils { JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); try { // No operation emitter will be used by some internal druid classes. - EmittingLogger.registerEmitter( - new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(), - new NoopEmitter() - )); + EmittingLogger.registerEmitter(new ServiceEmitter("druid-hive-indexer", + InetAddress.getLocalHost().getHostName(), + new NoopEmitter())); } catch (UnknownHostException e) { throw Throwables.propagate(e); } } /** - * Used by druid to perform IO on indexes + * Used by druid to perform IO on indexes. */ - public static final IndexIO INDEX_IO = + public static final IndexIO + INDEX_IO = new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), () -> 0); /** - * Used by druid to merge indexes + * Used by druid to merge indexes. */ - public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance() - ); + public static final IndexMergerV9 + INDEX_MERGER_V9 = + new IndexMergerV9(JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, TmpFileSegmentWriteOutMediumFactory.instance()); /** - * Generic Interner implementation used to read segments object from metadata storage + * Generic Interner implementation used to read segments object from metadata storage. */ - public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); + private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); /** * Method that creates a request for Druid query using SMILE format. * - * @param address - * @param query - * - * @return + * @param address of the host target. + * @param query druid query. * - * @throws IOException + * @return Request object to be submitted. */ - public static Request createSmileRequest(String address, io.druid.query.Query query) - throws IOException { - return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))) - .setContent(SMILE_MAPPER.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE); + public static Request createSmileRequest(String address, io.druid.query.Query query) { + try { + return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))).setContent( + SMILE_MAPPER.writeValueAsBytes(query)).setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE); + } catch (MalformedURLException e) { + LOG.error("URL Malformed address {}", address); + throw new RuntimeException(e); + } catch (JsonProcessingException e) { + LOG.error("can not Serialize the Query [{}]", query.toString()); + throw new RuntimeException(e); + } } /** * Method that submits a request to an Http address and retrieves the result. * The caller is responsible for closing the stream once it finishes consuming it. * - * @param client - * @param request + * @param client Http Client will be used to submit request. + * @param request Http request to be submitted. * - * @return + * @return response object. * - * @throws IOException + * @throws IOException in case of request IO error. */ - public static InputStream submitRequest(HttpClient client, Request request) - throws IOException { - InputStream response; + public static InputStream submitRequest(HttpClient client, Request request) throws IOException { try { - response = client.go(request, new InputStreamResponseHandler()).get(); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } catch (InterruptedException e) { + return client.go(request, new InputStreamResponseHandler()).get(); + } catch (ExecutionException | InterruptedException e) { throw new IOException(e.getCause()); } - return response; - } - public static String getURL(HttpClient client, URL url) throws IOException { - try (Reader reader = new InputStreamReader( - DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, url)))) { - return CharStreams.toString(reader); - } } - public static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request, - FullResponseHandler fullResponseHandler) - throws ExecutionException, InterruptedException { - FullResponseHolder responseHolder = client.go(request, - fullResponseHandler).get(); + static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, + Request request, + FullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException { + FullResponseHolder responseHolder = client.go(request, fullResponseHandler).get(); if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) { String redirectUrlStr = responseHolder.getResponse().headers().get("Location"); - LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), - redirectUrlStr); + LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); final URL redirectUrl; try { redirectUrl = new URL(redirectUrlStr); } catch (MalformedURLException ex) { - throw new ExecutionException( - String.format( - "Malformed redirect location is found in response from url[%s], new location[%s].", - request.getUrl(), - redirectUrlStr), - ex - ); + throw new ExecutionException(String.format( + "Malformed redirect location is found in response from url[%s], new location[%s].", + request.getUrl(), + redirectUrlStr), ex); } - responseHolder = client.go(withUrl(request, redirectUrl), - fullResponseHandler).get(); + responseHolder = client.go(withUrl(request, redirectUrl), fullResponseHandler).get(); } return responseHolder; } - private static Request withUrl(Request old, URL url) - { + private static Request withUrl(Request old, URL url) { Request req = new Request(old.getMethod(), url); req.addHeaderValues(old.getHeaders()); if (old.hasContent()) { @@ -320,68 +323,49 @@ public final class DruidStorageHandlerUtils { /** * @param taskDir path to the directory containing the segments descriptor info - * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json + * the descriptor path will be + * ../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json * @param conf hadoop conf to get the file system * * @return List of DataSegments * * @throws IOException can be for the case we did not produce data. */ - public static List<DataSegment> getCreatedSegments(Path taskDir, Configuration conf) - throws IOException { + public static List<DataSegment> getCreatedSegments(Path taskDir, Configuration conf) throws IOException { ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); FileStatus[] fss; fss = fs.listStatus(taskDir); for (FileStatus fileStatus : fss) { - final DataSegment segment = JSON_MAPPER - .readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); + final DataSegment segment = JSON_MAPPER.readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); publishedSegmentsBuilder.add(segment); } return publishedSegmentsBuilder.build(); } /** - * This function will write to filesystem serialized from of segment descriptor - * if an existing file exists it will try to replace it. + * Writes to filesystem serialized form of segment descriptor if an existing file exists it will try to replace it. * - * @param outputFS filesystem - * @param segment DataSegment object - * @param descriptorPath path + * @param outputFS filesystem. + * @param segment DataSegment object. + * @param descriptorPath path. * - * @throws IOException + * @throws IOException in case any IO issues occur. */ - public static void writeSegmentDescriptor( - final FileSystem outputFS, - final DataSegment segment, - final Path descriptorPath - ) - throws IOException { - final DataPusher descriptorPusher = (DataPusher) RetryProxy.create( - DataPusher.class, () -> { - try { - if (outputFS.exists(descriptorPath)) { - if (!outputFS.delete(descriptorPath, false)) { - throw new IOException( - String.format("Failed to delete descriptor at [%s]", descriptorPath)); - } - } - try (final OutputStream descriptorOut = outputFS.create( - descriptorPath, - true, - DEFAULT_FS_BUFFER_SIZE - )) { - JSON_MAPPER.writeValue(descriptorOut, segment); - descriptorOut.flush(); - } - } catch (RuntimeException | IOException ex) { - throw ex; - } - return -1; - }, - RetryPolicies - .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) - ); + public static void writeSegmentDescriptor(final FileSystem outputFS, + final DataSegment segment, + final Path descriptorPath) throws IOException { + final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(DataPusher.class, () -> { + if (outputFS.exists(descriptorPath)) { + if (!outputFS.delete(descriptorPath, false)) { + throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath)); + } + } + try (final OutputStream descriptorOut = outputFS.create(descriptorPath, true, DEFAULT_FS_BUFFER_SIZE)) { + JSON_MAPPER.writeValue(descriptorOut, segment); + descriptorOut.flush(); + } + }, RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)); descriptorPusher.push(); } @@ -391,23 +375,17 @@ public final class DruidStorageHandlerUtils { * * @return all the active data sources in the metadata storage */ - public static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector, - final MetadataStorageTablesConfig metadataStorageTablesConfig - ) { - return connector.getDBI().withHandle( - (HandleCallback<List<String>>) handle -> handle.createQuery( - String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true", - metadataStorageTablesConfig.getSegmentsTable() - )) - .fold(Lists.<String>newArrayList(), - (druidDataSources, stringObjectMap, foldController, statementContext) -> { - druidDataSources.add( - MapUtils.getString(stringObjectMap, "datasource") - ); - return druidDataSources; - } - ) - ); + static Collection<String> getAllDataSourceNames(SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig) { + return connector.getDBI() + .withHandle((HandleCallback<List<String>>) handle -> handle.createQuery(String.format( + "SELECT DISTINCT(datasource) FROM %s WHERE used = true", + metadataStorageTablesConfig.getSegmentsTable())) + .fold(Lists.<String>newArrayList(), + (druidDataSources, stringObjectMap, foldController, statementContext) -> { + druidDataSources.add(MapUtils.getString(stringObjectMap, "datasource")); + return druidDataSources; + })); } /** @@ -417,21 +395,19 @@ public final class DruidStorageHandlerUtils { * * @return true if the data source was successfully disabled false otherwise */ - public static boolean disableDataSource(SQLMetadataConnector connector, - final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource - ) { + static boolean disableDataSource(SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource) { try { if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) { LOG.warn("Cannot delete data source {}, does not exist", dataSource); return false; } - connector.getDBI().withHandle( - (HandleCallback<Void>) handle -> { - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - return null; - } - ); + connector.getDBI().withHandle((HandleCallback<Void>) handle -> { + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + return null; + }); } catch (Exception e) { LOG.error(String.format("Error removing dataSource %s", dataSource), e); @@ -441,8 +417,8 @@ public final class DruidStorageHandlerUtils { } /** - * First computes the segments timeline to accommodate new segments for insert into case - * Then moves segments to druid deep storage with updated metadata/version + * First computes the segments timeline to accommodate new segments for insert into case. + * Then moves segments to druid deep storage with updated metadata/version. * ALL IS DONE IN ONE TRANSACTION * * @param connector DBI connector to commit @@ -456,153 +432,129 @@ public final class DruidStorageHandlerUtils { * @return List of successfully published Druid segments. * This list has the updated versions and metadata about segments after move and timeline sorting * - * @throws CallbackFailedException + * @throws CallbackFailedException in case the connector can not add the segment to the DB. */ - public static List<DataSegment> publishSegmentsAndCommit(final SQLMetadataConnector connector, - final MetadataStorageTablesConfig metadataStorageTablesConfig, - final String dataSource, - final List<DataSegment> segments, - boolean overwrite, - Configuration conf, - DataSegmentPusher dataSegmentPusher - ) throws CallbackFailedException { - return connector.getDBI().inTransaction( - (handle, transactionStatus) -> { - // We create the timeline for the existing and new segments - VersionedIntervalTimeline<String, DataSegment> timeline; - if (overwrite) { - // If we are overwriting, we disable existing sources - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - - // When overwriting, we just start with empty timeline, - // as we are overwriting segments with new versions - timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - } else { - // Append Mode - if (segments.isEmpty()) { - // If there are no new segments, we can just bail out - return Collections.EMPTY_LIST; - } - // Otherwise, build a timeline of existing segments in metadata storage - Interval indexedInterval = JodaUtils - .umbrellaInterval(Iterables.transform(segments, - input -> input.getInterval() - )); - LOG.info("Building timeline for umbrella Interval [{}]", indexedInterval); - timeline = getTimelineForIntervalWithHandle( - handle, dataSource, indexedInterval, metadataStorageTablesConfig); - } - - final List<DataSegment> finalSegmentsToPublish = Lists.newArrayList(); - for (DataSegment segment : segments) { - List<TimelineObjectHolder<String, DataSegment>> existingChunks = timeline - .lookup(segment.getInterval()); - if (existingChunks.size() > 1) { - // Not possible to expand since we have more than one chunk with a single segment. - // This is the case when user wants to append a segment with coarser granularity. - // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. - // Druid shard specs does not support multiple partitions for same interval with different granularity. - throw new IllegalStateException( - String.format( - "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append new segment.", - dataSource, - segment.getInterval(), - existingChunks.size() - ) - ); - } - // Find out the segment with latest version and maximum partition number - SegmentIdentifier max = null; - final ShardSpec newShardSpec; - final String newVersion; - if (!existingChunks.isEmpty()) { - // Some existing chunk, Find max - TimelineObjectHolder<String, DataSegment> existingHolder = Iterables - .getOnlyElement(existingChunks); - for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) { - if (max == null || - max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); - } - } - } - - if (max == null) { - // No existing shard present in the database, use the current version. - newShardSpec = segment.getShardSpec(); - newVersion = segment.getVersion(); - } else { - // use version of existing max segment to generate new shard spec - newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); - newVersion = max.getVersion(); - } - DataSegment publishedSegment = publishSegmentWithShardSpec( - segment, - newShardSpec, - newVersion, - getPath(segment).getFileSystem(conf), - dataSegmentPusher - ); - finalSegmentsToPublish.add(publishedSegment); - timeline.add( - publishedSegment.getInterval(), - publishedSegment.getVersion(), - publishedSegment.getShardSpec().createChunk(publishedSegment) - ); - - } - - // Publish new segments to metadata storage - final PreparedBatch batch = handle.prepareBatch( - String.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - metadataStorageTablesConfig.getSegmentsTable() - ) - - ); - - for (final DataSegment segment : finalSegmentsToPublish) { - - batch.add( - new ImmutableMap.Builder<String, Object>() - .put("id", segment.getIdentifier()) - .put("dataSource", segment.getDataSource()) - .put("created_date", new DateTime().toString()) - .put("start", segment.getInterval().getStart().toString()) - .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", - (segment.getShardSpec() instanceof NoneShardSpec) ? - false : - true - ) - .put("version", segment.getVersion()) - .put("used", true) - .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) - .build() - ); - - LOG.info("Published {}", segment.getIdentifier()); - } - batch.execute(); - - return finalSegmentsToPublish; + @SuppressWarnings("unchecked") static List<DataSegment> publishSegmentsAndCommit(final SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource, + final List<DataSegment> segments, + boolean overwrite, + Configuration conf, + DataSegmentPusher dataSegmentPusher) throws CallbackFailedException { + return connector.getDBI().inTransaction((handle, transactionStatus) -> { + // We create the timeline for the existing and new segments + VersionedIntervalTimeline<String, DataSegment> timeline; + if (overwrite) { + // If we are overwriting, we disable existing sources + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + + // When overwriting, we just start with empty timeline, + // as we are overwriting segments with new versions + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + } else { + // Append Mode + if (segments.isEmpty()) { + // If there are no new segments, we can just bail out + return Collections.EMPTY_LIST; + } + // Otherwise, build a timeline of existing segments in metadata storage + Interval + indexedInterval = + JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())); + LOG.info("Building timeline for umbrella Interval [{}]", indexedInterval); + timeline = getTimelineForIntervalWithHandle(handle, dataSource, indexedInterval, metadataStorageTablesConfig); + } + + final List<DataSegment> finalSegmentsToPublish = Lists.newArrayList(); + for (DataSegment segment : segments) { + List<TimelineObjectHolder<String, DataSegment>> existingChunks = timeline.lookup(segment.getInterval()); + if (existingChunks.size() > 1) { + // Not possible to expand since we have more than one chunk with a single segment. + // This is the case when user wants to append a segment with coarser granularity. + // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. + // Druid shard specs does not support multiple partitions for same interval with different granularity. + throw new IllegalStateException(String.format( + "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. " + + "Not possible to append new segment.", + dataSource, + segment.getInterval(), + existingChunks.size())); + } + // Find out the segment with latest version and maximum partition number + SegmentIdentifier max = null; + final ShardSpec newShardSpec; + final String newVersion; + if (!existingChunks.isEmpty()) { + // Some existing chunk, Find max + TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); } - ); + } + } + + if (max == null) { + // No existing shard present in the database, use the current version. + newShardSpec = segment.getShardSpec(); + newVersion = segment.getVersion(); + } else { + // use version of existing max segment to generate new shard spec + newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); + newVersion = max.getVersion(); + } + DataSegment + publishedSegment = + publishSegmentWithShardSpec(segment, + newShardSpec, + newVersion, + getPath(segment).getFileSystem(conf), + dataSegmentPusher); + finalSegmentsToPublish.add(publishedSegment); + timeline.add(publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().createChunk(publishedSegment)); + + } + + // Publish new segments to metadata storage + final PreparedBatch + batch = + handle.prepareBatch(String.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + metadataStorageTablesConfig.getSegmentsTable()) + + ); + + for (final DataSegment segment : finalSegmentsToPublish) { + + batch.add(new ImmutableMap.Builder<String, Object>().put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec)) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) + .build()); + + LOG.info("Published {}", segment.getIdentifier()); + } + batch.execute(); + + return finalSegmentsToPublish; + }); } - public static void disableDataSourceWithHandle(Handle handle, - MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource - ) { - handle.createStatement( - String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - ) - ) - .bind("dataSource", dataSource) - .execute(); + private static void disableDataSourceWithHandle(Handle handle, + MetadataStorageTablesConfig metadataStorageTablesConfig, + String dataSource) { + handle.createStatement(String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable())).bind("dataSource", dataSource).execute(); } /** @@ -612,41 +564,29 @@ public final class DruidStorageHandlerUtils { * * @return List of all data segments part of the given data source */ - public static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector, - final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource - ) { - List<DataSegment> segmentList = connector.retryTransaction( - (handle, status) -> handle - .createQuery(String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - )) - .setFetchSize(getStreamingFetchSize(connector)) - .bind("dataSource", dataSource) - .map(ByteArrayMapper.FIRST) - .fold( - new ArrayList<>(), - (Folder3<List<DataSegment>, byte[]>) (accumulator, payload, control, ctx) -> { - try { - final DataSegment segment = DATA_SEGMENT_INTERNER.intern( - JSON_MAPPER.readValue( - payload, - DataSegment.class - )); - - accumulator.add(segment); - return accumulator; - } catch (Exception e) { - throw new SQLException(e.toString()); - } - } - ) - , 3, DEFAULT_MAX_TRIES); - return segmentList; + static List<DataSegment> getDataSegmentList(final SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource) { + return connector.retryTransaction((handle, status) -> handle.createQuery(String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable())) + .setFetchSize(getStreamingFetchSize(connector)) + .bind("dataSource", dataSource) + .map(ByteArrayMapper.FIRST) + .fold(new ArrayList<>(), (Folder3<List<DataSegment>, byte[]>) (accumulator, payload, control, ctx) -> { + try { + final DataSegment segment = DATA_SEGMENT_INTERNER.intern(JSON_MAPPER.readValue(payload, DataSegment.class)); + + accumulator.add(segment); + return accumulator; + } catch (Exception e) { + throw new SQLException(e.toString()); + } + }), 3, DEFAULT_MAX_TRIES); } /** - * @param connector + * @param connector SQL DBI connector. * * @return streaming fetch size. */ @@ -658,112 +598,150 @@ public final class DruidStorageHandlerUtils { } /** - * @param pushedSegment - * @param segmentsDescriptorDir + * @param pushedSegment the pushed data segment object + * @param segmentsDescriptorDir actual directory path for descriptors. * * @return a sanitize file name */ - public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, - Path segmentsDescriptorDir - ) { - return new Path( - segmentsDescriptorDir, - String.format("%s.json", pushedSegment.getIdentifier().replace(":", "")) - ); + public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, Path segmentsDescriptorDir) { + return new Path(segmentsDescriptorDir, String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))); } public static String createScanAllQuery(String dataSourceName, List<String> columns) throws JsonProcessingException { final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder(); - final List<Interval> intervals = Arrays.asList(DEFAULT_INTERVAL); - ScanQuery scanQuery = scanQueryBuilder - .dataSource(dataSourceName) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) - .intervals(new MultipleIntervalSegmentSpec(intervals)) - .columns(columns) - .build(); + final List<Interval> intervals = Collections.singletonList(DEFAULT_INTERVAL); + ScanQuery + scanQuery = + scanQueryBuilder.dataSource(dataSourceName) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .intervals(new MultipleIntervalSegmentSpec(intervals)) + .columns(columns) + .build(); return JSON_MAPPER.writeValueAsString(scanQuery); } + + @Nullable static Boolean getBooleanProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + return Boolean.parseBoolean(val); + } + + @Nullable static Integer getIntegerProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String.format("Exception while parsing property[%s] with Value [%s] as Integer", + propertyName, + val)); + } + } + + @Nullable static Long getLongProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String.format("Exception while parsing property[%s] with Value [%s] as Long", + propertyName, + val)); + } + } + + @Nullable static Period getPeriodProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Period.parse(val); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format("Exception while parsing property[%s] with Value [%s] as Period", + propertyName, + val)); + } + } + + static String getTableProperty(Table table, String propertyName) { + return table.getParameters().get(propertyName); + } + /** - * Simple interface for retry operations + * Simple interface for retry operations. */ public interface DataPusher { - long push() throws IOException; + void push() throws IOException; } // Thanks, HBase Storage handler - public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException { + @SuppressWarnings("SameParameterValue") static void addDependencyJars(Configuration conf, Class<?>... classes) + throws IOException { FileSystem localFs = FileSystem.getLocal(conf); - Set<String> jars = new HashSet<String>(); - jars.addAll(conf.getStringCollection("tmpjars")); + Set<String> jars = new HashSet<>(conf.getStringCollection("tmpjars")); for (Class<?> clazz : classes) { if (clazz == null) { continue; } - String path = Utilities.jarFinderGetJar(clazz); + final String path = Utilities.jarFinderGetJar(clazz); if (path == null) { - throw new RuntimeException( - "Could not find jar for class " + clazz + " in order to ship it to the cluster."); + throw new RuntimeException("Could not find jar for class " + clazz + " in order to ship it to the cluster."); } if (!localFs.exists(new Path(path))) { throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz); } - jars.add(path.toString()); + jars.add(path); } if (jars.isEmpty()) { return; } + //noinspection ToArrayCallWithZeroLengthArrayArgument conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); } - private static VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalWithHandle( - final Handle handle, - final String dataSource, - final Interval interval, - final MetadataStorageTablesConfig dbTables - ) throws IOException { - Query<Map<String, Object>> sql = handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE used = true AND dataSource = ? AND start <= ? AND \"end\" >= ?", - dbTables.getSegmentsTable() - ) - ).bind(0, dataSource) + private static VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalWithHandle(final Handle handle, + final String dataSource, + final Interval interval, + final MetadataStorageTablesConfig dbTables) throws IOException { + Query<Map<String, Object>> + sql = + handle.createQuery(String.format( + "SELECT payload FROM %s WHERE used = true AND dataSource = ? AND start <= ? AND \"end\" >= ?", + dbTables.getSegmentsTable())) + .bind(0, dataSource) .bind(1, interval.getEnd().toString()) .bind(2, interval.getStart().toString()); - final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>( - Ordering.natural() - ); - final ResultIterator<byte[]> dbSegments = sql - .map(ByteArrayMapper.FIRST) - .iterator(); - try { + final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + try (ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) { while (dbSegments.hasNext()) { final byte[] payload = dbSegments.next(); - DataSegment segment = JSON_MAPPER.readValue( - payload, - DataSegment.class - ); - timeline.add(segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segment) - ); + DataSegment segment = JSON_MAPPER.readValue(payload, DataSegment.class); + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); } - } finally { - dbSegments.close(); } return timeline; } - public static DataSegmentPusher createSegmentPusherForDirectory(String segmentDirectory, - Configuration configuration) throws IOException { + public static DataSegmentPusher createSegmentPusherForDirectory(String segmentDirectory, Configuration configuration) + throws IOException { final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory); - return new HdfsDataSegmentPusher( - hdfsDataSegmentPusherConfig, configuration, JSON_MAPPER); + return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, configuration, JSON_MAPPER); } - public static DataSegment publishSegmentWithShardSpec(DataSegment segment, ShardSpec shardSpec, - String version, FileSystem fs, DataSegmentPusher dataSegmentPusher - ) throws IOException { + private static DataSegment publishSegmentWithShardSpec(DataSegment segment, + ShardSpec shardSpec, + String version, + FileSystem fs, + DataSegmentPusher dataSegmentPusher) throws IOException { boolean retry = true; DataSegment.Builder dataSegmentBuilder = new DataSegment.Builder(segment).version(version); Path finalPath = null; @@ -772,8 +750,9 @@ public final class DruidStorageHandlerUtils { dataSegmentBuilder.shardSpec(shardSpec); final Path intermediatePath = getPath(segment); - finalPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher - .makeIndexPathName(dataSegmentBuilder.build(), DruidStorageHandlerUtils.INDEX_ZIP)); + finalPath = + new Path(dataSegmentPusher.getPathForHadoop(), + dataSegmentPusher.makeIndexPathName(dataSegmentBuilder.build(), DruidStorageHandlerUtils.INDEX_ZIP)); // Create parent if it does not exist, recreation is not an error fs.mkdirs(finalPath.getParent()); @@ -784,16 +763,13 @@ public final class DruidStorageHandlerUtils { retry = true; } else { throw new IOException(String.format( - "Failed to rename intermediate segment[%s] to final segment[%s] is not present.", - intermediatePath, - finalPath - )); + "Failed to rename intermediate segment[%s] to final segment[%s] is not present.", + intermediatePath, + finalPath)); } } } - DataSegment dataSegment = dataSegmentBuilder - .loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri())) - .build(); + DataSegment dataSegment = dataSegmentBuilder.loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri())).build(); writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), DruidStorageHandlerUtils.DESCRIPTOR_JSON)); @@ -804,61 +780,58 @@ public final class DruidStorageHandlerUtils { if (shardSpec instanceof LinearShardSpec) { return new LinearShardSpec(shardSpec.getPartitionNum() + 1); } else if (shardSpec instanceof NumberedShardSpec) { - return new NumberedShardSpec(shardSpec.getPartitionNum(), - ((NumberedShardSpec) shardSpec).getPartitions() - ); + return new NumberedShardSpec(shardSpec.getPartitionNum(), ((NumberedShardSpec) shardSpec).getPartitions()); } else { // Druid only support appending more partitions to Linear and Numbered ShardSpecs. - throw new IllegalStateException( - String.format( - "Cannot expand shard spec [%s]", - shardSpec - ) - ); + throw new IllegalStateException(String.format("Cannot expand shard spec [%s]", shardSpec)); } } - public static Path getPath(DataSegment dataSegment) { - return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); + static Path getPath(DataSegment dataSegment) { + return new Path(String.valueOf(Objects.requireNonNull(dataSegment.getLoadSpec()).get("path"))); } public static GranularitySpec getGranularitySpec(Configuration configuration, Properties tableProperties) { - final String segmentGranularity = + final String + segmentGranularity = tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ? tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); - final boolean rollup = tableProperties.getProperty(Constants.DRUID_ROLLUP) != null ? - Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY)): - HiveConf.getBoolVar(configuration, HiveConf.ConfVars.HIVE_DRUID_ROLLUP); - return new UniformGranularitySpec( - Granularity.fromString(segmentGranularity), - Granularity.fromString( - tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null - ? "NONE" - : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), + final boolean + rollup = + tableProperties.getProperty(DRUID_ROLLUP) != null ? + Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY)) : + HiveConf.getBoolVar(configuration, HiveConf.ConfVars.HIVE_DRUID_ROLLUP); + return new UniformGranularitySpec(Granularity.fromString(segmentGranularity), + Granularity.fromString(tableProperties.getProperty(DRUID_QUERY_GRANULARITY) == null ? + "NONE" : + tableProperties.getProperty(DRUID_QUERY_GRANULARITY)), rollup, - null - ); + null); } public static IndexSpec getIndexSpec(Configuration jc) { - IndexSpec indexSpec; + final BitmapSerdeFactory bitmapSerdeFactory; if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) { - indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); + bitmapSerdeFactory = new ConciseBitmapSerdeFactory(); } else { - indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); + bitmapSerdeFactory = new RoaringBitmapSerdeFactory(true); } - return indexSpec; + return new IndexSpec(bitmapSerdeFactory, + IndexSpec.DEFAULT_DIMENSION_COMPRESSION, + IndexSpec.DEFAULT_METRIC_COMPRESSION, + IndexSpec.DEFAULT_LONG_ENCODING); } - public static Pair<List<DimensionSchema>, AggregatorFactory[]> getDimensionsAndAggregates(Configuration jc, List<String> columnNames, + public static Pair<List<DimensionSchema>, AggregatorFactory[]> getDimensionsAndAggregates(List<String> columnNames, List<TypeInfo> columnTypes) { // Default, all columns that are not metrics or timestamp, are treated as dimensions final List<DimensionSchema> dimensions = new ArrayList<>(); ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder(); for (int i = 0; i < columnTypes.size(); i++) { - final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) columnTypes - .get(i)).getPrimitiveCategory(); + final PrimitiveObjectInspector.PrimitiveCategory + primitiveCategory = + ((PrimitiveTypeInfo) columnTypes.get(i)).getPrimitiveCategory(); AggregatorFactory af; switch (primitiveCategory) { case BYTE: @@ -874,38 +847,39 @@ public final class DruidStorageHandlerUtils { af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); break; case DECIMAL: - throw new UnsupportedOperationException( - String.format("Druid does not support decimal column type cast column " - + "[%s] to double", columnNames.get(i))); - + throw new UnsupportedOperationException(String.format("Druid does not support decimal column type cast column " + + "[%s] to double", columnNames.get(i))); case TIMESTAMP: // Granularity column String tColumnName = columnNames.get(i); - if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && - !tColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new IllegalArgumentException( - "Dimension " + tColumnName + " does not have STRING type: " + - primitiveCategory); + if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && !tColumnName.equals( + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalArgumentException("Dimension " + + tColumnName + + " does not have STRING type: " + + primitiveCategory); } continue; case TIMESTAMPLOCALTZ: // Druid timestamp column String tLocalTZColumnName = columnNames.get(i); if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new IllegalArgumentException( - "Dimension " + tLocalTZColumnName + " does not have STRING type: " + - primitiveCategory); + throw new IllegalArgumentException("Dimension " + + tLocalTZColumnName + + " does not have STRING type: " + + primitiveCategory); } continue; default: // Dimension String dColumnName = columnNames.get(i); - if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) != - PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP + if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) + != PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) { - throw new IllegalArgumentException( - "Dimension " + dColumnName + " does not have STRING type: " + - primitiveCategory); + throw new IllegalArgumentException("Dimension " + + dColumnName + + " does not have STRING type: " + + primitiveCategory); } dimensions.add(new StringDimensionSchema(dColumnName)); continue; @@ -913,7 +887,6 @@ public final class DruidStorageHandlerUtils { aggregatorFactoryBuilder.add(af); } ImmutableList<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build(); - return Pair.of(dimensions, - aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()])); + return Pair.of(dimensions, aggregatorFactories.toArray(new AggregatorFactory[0])); } }
http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index ecb4360..862d7ca 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -6,17 +6,19 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.druid.io; +import com.fasterxml.jackson.core.type.TypeReference; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; @@ -64,9 +67,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; -public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> { +/** + * Druid Output format class used to write data as Native Druid Segment. + */ +public class DruidOutputFormat implements HiveOutputFormat<NullWritable, DruidWritable> { - protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class); @Override public FileSinkOperator.RecordWriter getHiveRecordWriter( @@ -89,7 +95,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE) == null ? jc.get(Constants.DRUID_DATA_SOURCE) : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE); - final String segmentDirectory = jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); + final String segmentDirectory = jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties); @@ -111,7 +117,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils - .getDimensionsAndAggregates(jc, columnNames, columnTypes); + .getDimensionsAndAggregates(columnNames, columnTypes); final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), new DimensionsSpec(dimensionsAndAggregates.lhs, Lists @@ -121,8 +127,10 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl ) )); - Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER - .convertValue(inputRowParser, Map.class); + Map<String, Object> + inputParser = + DruidStorageHandlerUtils.JSON_MAPPER.convertValue(inputRowParser, new TypeReference<Map<String, Object>>() { + }); final DataSchema dataSchema = new DataSchema( Preconditions.checkNotNull(dataSource, "Data source name is null"), @@ -133,8 +141,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl DruidStorageHandlerUtils.JSON_MAPPER ); - final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY); - final String version = jc.get(Constants.DRUID_SEGMENT_VERSION); + final String workingPath = jc.get(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY); + final String version = jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); if (Strings.isNullOrEmpty(basePersistDirectory)) { @@ -170,7 +178,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl } @Override - public RecordWriter<K, DruidWritable> getRecordWriter( + public RecordWriter<NullWritable, DruidWritable> getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress ) throws IOException { throw new UnsupportedOperationException("please implement me !"); http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 1c989c1..ff766c4 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -59,6 +59,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; @@ -77,7 +78,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class); - public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { + @Nullable public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { switch (druidQueryType) { case Query.TIMESERIES: return new DruidTimeseriesQueryRecordReader(); @@ -89,8 +90,9 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW return new DruidSelectQueryRecordReader(); case Query.SCAN: return new DruidScanQueryRecordReader(); + default: + return null; } - return null; } @Override @@ -101,7 +103,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - return Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration())); + return Arrays.asList(getInputSplits(context.getConfiguration())); } @SuppressWarnings("deprecation") @@ -150,39 +152,35 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW // Druid query with user timezone, as this is default Hive time semantics. // Then, create splits with the Druid queries. switch (druidQueryType) { - case Query.TIMESERIES: - case Query.TOPN: - case Query.GROUP_BY: - return new HiveDruidSplit[] { - new HiveDruidSplit(druidQuery, paths[0], new String[] { address }) }; - case Query.SELECT: - SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( - druidQuery, SelectQuery.class); - return distributeSelectQuery(conf, address, selectQuery, paths[0]); - case Query.SCAN: - ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( - druidQuery, ScanQuery.class); - return distributeScanQuery(conf, address, scanQuery, paths[0]); + case Query.TIMESERIES: + case Query.TOPN: + case Query.GROUP_BY: + return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], new String[] {address})}; + case Query.SELECT: + SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); + return distributeSelectQuery(address, selectQuery, paths[0]); + case Query.SCAN: + ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class); + return distributeScanQuery(address, scanQuery, paths[0]); default: - throw new IOException("Druid query type not recognized"); + throw new IOException("Druid query type not recognized"); } } /* New method that distributes the Select query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ - private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address, - SelectQuery query, Path dummyPath) throws IOException { + private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath) + throws IOException { // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + final boolean isFetch = query.getContextBoolean(DruidStorageHandlerUtils.DRUID_QUERY_FETCH, false); if (isFetch) { - return new HiveDruidSplit[] { new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, - new String[]{address} ) }; + return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), + dummyPath, + new String[] {address})}; } - final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors( - address, query); + final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors(address, query); // Create one input split for each segment final int numSplits = segmentDescriptors.size(); @@ -211,14 +209,15 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW /* New method that distributes the Scan query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ - private static HiveDruidSplit[] distributeScanQuery(Configuration conf, String address, - ScanQuery query, Path dummyPath) throws IOException { + private static HiveDruidSplit[] distributeScanQuery(String address, ScanQuery query, Path dummyPath) + throws IOException { // If it has a limit, we use it and we do not distribute the query final boolean isFetch = query.getLimit() < Long.MAX_VALUE; if (isFetch) { - return new HiveDruidSplit[] { new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, - new String[]{address} ) }; + return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), + dummyPath, + new String[] {address}) + }; } final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors( @@ -288,7 +287,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW throws IOException { // We need to provide a different record reader for every type of Druid query. // The reason is that Druid results format is different for each type. - final DruidQueryRecordReader<?, ?> reader; + final DruidQueryRecordReader<?> reader; final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE); if (druidQueryType == null) { reader = new DruidScanQueryRecordReader(); // By default we use scan query as fallback. @@ -314,7 +313,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW if (druidQueryType == null) { return new DruidScanQueryRecordReader(); // By default, we use druid scan query as fallback. } - final DruidQueryRecordReader<?, ?> reader = + final DruidQueryRecordReader<?> reader = getDruidQueryReader(druidQueryType); if (reader == null) { throw new IOException("Druid query type " + druidQueryType + " not recognized"); http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 400262a..671b8cf 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -17,13 +17,10 @@ */ package org.apache.hadoop.hive.druid.io; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.druid.data.input.Committer; @@ -49,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.RecordWriter; @@ -58,16 +56,20 @@ import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import java.util.stream.Collectors; -public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>, - org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { +/** + * Druid Record Writer, implementing a file sink operator. + */ +public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>, FileSinkOperator.RecordWriter { protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class); private final DataSchema dataSchema; @@ -88,36 +90,32 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab private final Granularity segmentGranularity; - public DruidRecordWriter( - DataSchema dataSchema, - RealtimeTuningConfig realtimeTuningConfig, - DataSegmentPusher dataSegmentPusher, - int maxPartitionSize, - final Path segmentsDescriptorsDir, - final FileSystem fileSystem - ) { - File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), - UUID.randomUUID().toString() - ); - this.tuningConfig = Preconditions - .checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir), - "realtimeTuningConfig is null" - ); + public DruidRecordWriter(DataSchema dataSchema, + RealtimeTuningConfig realtimeTuningConfig, + DataSegmentPusher dataSegmentPusher, + int maxPartitionSize, + final Path segmentsDescriptorsDir, + final FileSystem fileSystem) { + File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), UUID.randomUUID().toString()); + this.tuningConfig = + Preconditions.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir), + "realtimeTuningConfig is null"); this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null"); - appenderator = Appenderators - .createOffline(this.dataSchema, tuningConfig, new FireDepartmentMetrics(), - dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9 - ); + appenderator = + Appenderators.createOffline(this.dataSchema, + tuningConfig, + new FireDepartmentMetrics(), + dataSegmentPusher, + DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.INDEX_IO, + DruidStorageHandlerUtils.INDEX_MERGER_V9); this.maxPartitionSize = maxPartitionSize; appenderator.startJob(); - this.segmentsDescriptorDir = Preconditions - .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null"); + this.segmentsDescriptorDir = Preconditions.checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null"); this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null"); - this.segmentGranularity = this.dataSchema.getGranularitySpec() - .getSegmentGranularity(); - committerSupplier = Suppliers.ofInstance(Committers.nil()); + this.segmentGranularity = this.dataSchema.getGranularitySpec().getSegmentGranularity(); + committerSupplier = Suppliers.ofInstance(Committers.nil())::get; } /** @@ -132,19 +130,15 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(truncatedTime)); - final Interval interval = new Interval( - truncatedDateTime, - segmentGranularity.increment(truncatedDateTime) - ); + final Interval interval = new Interval(truncatedDateTime, segmentGranularity.increment(truncatedDateTime)); SegmentIdentifier retVal; if (currentOpenSegment == null) { - currentOpenSegment = new SegmentIdentifier( - dataSchema.getDataSource(), + currentOpenSegment = + new SegmentIdentifier(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(0) - ); + new LinearShardSpec(0)); return currentOpenSegment; } else if (currentOpenSegment.getInterval().equals(interval)) { retVal = currentOpenSegment; @@ -152,25 +146,24 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab if (rowCount < maxPartitionSize) { return retVal; } else { - retVal = new SegmentIdentifier( - dataSchema.getDataSource(), + retVal = + new SegmentIdentifier(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1) - ); + new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)); pushSegments(Lists.newArrayList(currentOpenSegment)); LOG.info("Creating new partition for segment {}, partition num {}", - retVal.getIdentifierAsString(), retVal.getShardSpec().getPartitionNum()); + retVal.getIdentifierAsString(), + retVal.getShardSpec().getPartitionNum()); currentOpenSegment = retVal; return retVal; } } else { - retVal = new SegmentIdentifier( - dataSchema.getDataSource(), + retVal = + new SegmentIdentifier(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(0) - ); + new LinearShardSpec(0)); pushSegments(Lists.newArrayList(currentOpenSegment)); LOG.info("Creating segment {}", retVal.getIdentifierAsString()); currentOpenSegment = retVal; @@ -180,45 +173,30 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab private void pushSegments(List<SegmentIdentifier> segmentsToPush) { try { - SegmentsAndMetadata segmentsAndMetadata = appenderator - .push(segmentsToPush, committerSupplier.get(), false).get(); + SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentsToPush, committerSupplier.get(), false).get(); final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>(); for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) { - pushedSegmentIdentifierHashSet - .add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString()); - final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils - .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir); - DruidStorageHandlerUtils - .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath); - LOG.info( - String.format( - "Pushed the segment [%s] and persisted the descriptor located at [%s]", - pushedSegment, - segmentDescriptorOutputPath - ) - ); + pushedSegmentIdentifierHashSet.add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString()); + final Path + segmentDescriptorOutputPath = + DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir); + DruidStorageHandlerUtils.writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath); + LOG.info(String.format("Pushed the segment [%s] and persisted the descriptor located at [%s]", + pushedSegment, + segmentDescriptorOutputPath)); } - final HashSet<String> toPushSegmentsHashSet = new HashSet( - FluentIterable.from(segmentsToPush) - .transform(new Function<SegmentIdentifier, String>() { - @Nullable - @Override - public String apply( - @Nullable SegmentIdentifier input - ) { - return input.getIdentifierAsString(); - } - }) - .toList()); + final Set<String> + toPushSegmentsHashSet = + segmentsToPush.stream() + .map(SegmentIdentifier::getIdentifierAsString) + .collect(Collectors.toCollection(HashSet::new)); if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) { - throw new IllegalStateException(String.format( - "was asked to publish [%s] but was able to publish only [%s]", - Joiner.on(", ").join(toPushSegmentsHashSet), - Joiner.on(", ").join(pushedSegmentIdentifierHashSet) - )); + throw new IllegalStateException(String.format("was asked to publish [%s] but was able to publish only [%s]", + Joiner.on(", ").join(toPushSegmentsHashSet), + Joiner.on(", ").join(pushedSegmentIdentifierHashSet))); } for (SegmentIdentifier dataSegmentId : segmentsToPush) { LOG.info("Dropping segment {}", dataSegmentId.toString()); @@ -227,9 +205,7 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size())); } catch (InterruptedException e) { - LOG.error(String.format("got interrupted, failed to push [%,d] segments.", - segmentsToPush.size() - ), e); + LOG.error(String.format("got interrupted, failed to push [%,d] segments.", segmentsToPush.size()), e); Thread.currentThread().interrupt(); } catch (IOException | ExecutionException e) { LOG.error(String.format("Failed to push [%,d] segments.", segmentsToPush.size()), e); @@ -237,17 +213,17 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab } } - @Override - public void write(Writable w) throws IOException { + @Override public void write(Writable w) throws IOException { DruidWritable record = (DruidWritable) w; - final long timestamp = - (long) record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - final int partitionNumber = Math.toIntExact( - (long) record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1l)); - final InputRow inputRow = new MapBasedInputRow(timestamp, - dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), - record.getValue() - ); + final long timestamp = (long) record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); + final int + partitionNumber = + Math.toIntExact((long) record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1L)); + final InputRow + inputRow = + new MapBasedInputRow(timestamp, + dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), + record.getValue()); try { @@ -258,38 +234,37 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab Data with the same DRUID_SHARD_KEY_COL_NAME and Time interval will end in the same segment */ DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp)); - final Interval interval = new Interval( - truncatedDateTime, - segmentGranularity.increment(truncatedDateTime) - ); + final Interval interval = new Interval(truncatedDateTime, segmentGranularity.increment(truncatedDateTime)); if (currentOpenSegment != null) { if (currentOpenSegment.getShardSpec().getPartitionNum() != partitionNumber || !currentOpenSegment.getInterval().equals(interval)) { pushSegments(ImmutableList.of(currentOpenSegment)); - currentOpenSegment = new SegmentIdentifier(dataSchema.getDataSource(), interval, - tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(partitionNumber) - ); + currentOpenSegment = + new SegmentIdentifier(dataSchema.getDataSource(), + interval, + tuningConfig.getVersioningPolicy().getVersion(interval), + new LinearShardSpec(partitionNumber)); } - } else if (currentOpenSegment == null) { - currentOpenSegment = new SegmentIdentifier(dataSchema.getDataSource(), interval, - tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(partitionNumber) - ); + } else { + currentOpenSegment = + new SegmentIdentifier(dataSchema.getDataSource(), + interval, + tuningConfig.getVersioningPolicy().getVersion(interval), + new LinearShardSpec(partitionNumber)); } - appenderator.add(currentOpenSegment, inputRow, committerSupplier); + appenderator.add(currentOpenSegment, inputRow, committerSupplier::get); } else if (partitionNumber == -1 && maxPartitionSize != -1) { /*Case we are partitioning the segments based on time and max row per segment maxPartitionSize*/ - appenderator - .add(getSegmentIdentifierAndMaybePush(timestamp), inputRow, committerSupplier); + appenderator.add(getSegmentIdentifierAndMaybePush(timestamp), inputRow, committerSupplier::get); } else { throw new IllegalArgumentException(String.format( - "partitionNumber and maxPartitionSize should be mutually exclusive got partitionNum [%s] and maxPartitionSize [%s]", - partitionNumber, maxPartitionSize - )); + "partitionNumber and maxPartitionSize should be mutually exclusive " + + "got partitionNum [%s] and maxPartitionSize [%s]", + partitionNumber, + maxPartitionSize)); } } catch (SegmentNotWritableException e) { @@ -297,10 +272,9 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab } } - @Override - public void close(boolean abort) throws IOException { + @Override public void close(boolean abort) throws IOException { try { - if (abort == false) { + if (!abort) { final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList(); segmentsToPush.addAll(appenderator.getSegments()); pushSegments(segmentsToPush); @@ -311,20 +285,18 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab } finally { try { FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); - } catch (Exception e){ + } catch (Exception e) { LOG.error("error cleaning of base persist directory", e); } appenderator.close(); } } - @Override - public void write(NullWritable key, DruidWritable value) throws IOException { + @Override public void write(NullWritable key, DruidWritable value) throws IOException { this.write(value); } - @Override - public void close(Reporter reporter) throws IOException { + @Override public void close(Reporter reporter) throws IOException { this.close(false); }