This is an automated email from the ASF dual-hosted git repository. himanshug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 14aec7f add config to optionally disable all compression in intermediate segment persists while ingestion (#7919) 14aec7f is described below commit 14aec7fceca90dfaf9b2ce4dae68186d04ffcc47 Author: Himanshu <g.himan...@gmail.com> AuthorDate: Wed Jul 10 12:22:24 2019 -0700 add config to optionally disable all compression in intermediate segment persists while ingestion (#7919) * disable all compression in intermediate segment persists while ingestion * more changes and build fix * by default retain existing indexingSpec for intermediate persisted segments * document indexSpecForIntermediatePersists index tuning config * fix build issues * update serde tests --- .../development/extensions-core/kafka-ingestion.md | 3 +- .../extensions-core/kinesis-ingestion.md | 3 +- docs/content/ingestion/hadoop.md | 3 +- docs/content/ingestion/native_tasks.md | 10 ++++++ .../MaterializedViewSupervisorSpec.java | 1 + .../indexing/kafka/KafkaIndexTaskTuningConfig.java | 4 +++ .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../supervisor/KafkaSupervisorTuningConfig.java | 3 ++ .../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 + .../kafka/KafkaIndexTaskTuningConfigTest.java | 12 ++++++- .../kafka/supervisor/KafkaSupervisorTest.java | 2 ++ .../KafkaSupervisorTuningConfigTest.java | 8 ++++- .../TestModifiedKafkaIndexTaskTuningConfig.java | 2 ++ .../kinesis/KinesisIndexTaskTuningConfig.java | 3 ++ .../kinesis/supervisor/KinesisSupervisorSpec.java | 1 + .../supervisor/KinesisSupervisorTuningConfig.java | 3 ++ .../indexing/kinesis/KinesisIndexTaskTest.java | 1 + .../kinesis/KinesisIndexTaskTuningConfigTest.java | 3 ++ .../kinesis/supervisor/KinesisSupervisorTest.java | 2 ++ .../TestModifiedKinesisIndexTaskTuningConfig.java | 3 ++ .../druid/indexer/HadoopDruidIndexerConfig.java | 5 +++ .../apache/druid/indexer/HadoopTuningConfig.java | 14 ++++++++ .../apache/druid/indexer/IndexGeneratorJob.java | 2 +- .../druid/indexer/BatchDeltaIngestionTest.java | 1 + .../indexer/DetermineHashedPartitionsJobTest.java | 1 + .../druid/indexer/DeterminePartitionsJobTest.java | 1 + .../indexer/HadoopDruidIndexerConfigTest.java | 2 ++ .../druid/indexer/HadoopTuningConfigTest.java | 2 ++ .../druid/indexer/IndexGeneratorJobTest.java | 1 + .../org/apache/druid/indexer/JobHelperTest.java | 1 + .../indexer/path/GranularityPathSpecTest.java | 1 + .../index/RealtimeAppenderatorTuningConfig.java | 12 +++++++ .../indexing/common/index/YeOldePlumberSchool.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 40 +++++++++++++++++----- .../parallel/ParallelIndexSupervisorTask.java | 1 + .../batch/parallel/ParallelIndexTuningConfig.java | 4 ++- .../SeekableStreamIndexTaskTuningConfig.java | 13 +++++++ .../AppenderatorDriverRealtimeIndexTaskTest.java | 1 + .../indexing/common/task/CompactionTaskTest.java | 6 ++++ .../druid/indexing/common/task/IndexTaskTest.java | 4 +++ .../common/task/RealtimeIndexTaskTest.java | 1 + .../druid/indexing/common/task/TaskSerdeTest.java | 3 ++ .../ParallelIndexSupervisorTaskKillTest.java | 1 + .../ParallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../parallel/ParallelIndexSupervisorTaskTest.java | 2 ++ .../parallel/ParallelIndexTuningConfigTest.java | 1 + .../druid/indexing/overlord/TaskLifecycleTest.java | 4 +++ .../SeekableStreamSupervisorStateTest.java | 1 + .../segment/indexing/RealtimeTuningConfig.java | 14 ++++++++ .../realtime/appenderator/AppenderatorConfig.java | 2 ++ .../realtime/appenderator/AppenderatorImpl.java | 4 +-- .../segment/realtime/plumber/RealtimePlumber.java | 5 +-- .../segment/indexing/RealtimeTuningConfigTest.java | 10 ++++-- .../appenderator/AppenderatorPlumberTest.java | 1 + .../realtime/appenderator/AppenderatorTester.java | 1 + .../DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../druid/segment/realtime/plumber/SinkTest.java | 2 ++ .../druid/cli/validate/DruidJsonValidatorTest.java | 1 + 60 files changed, 215 insertions(+), 25 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index c070e46..ec1d046 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (defaul [...] |`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If [...] diff --git a/docs/content/development/extensions-core/kinesis-ingestion.md b/docs/content/development/extensions-core/kinesis-ingestion.md index 0578dd2..8c2ac33 100644 --- a/docs/content/development/extensions-core/kinesis-ingestion.md +++ b/docs/content/development/extensions-core/kinesis-ingestion.md @@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (defaul [...] |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and there [...] diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md index b9a6d72..1c0bfbc 100644 --- a/docs/content/ingestion/hadoop.md +++ b/docs/content/ingestion/hadoop.md @@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)| |useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| -|indexSpec|Object|Tune how data is indexed. See below for more information.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (defaul [...] |numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index ad7cac9..c5cd91b 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...] |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -375,6 +376,14 @@ An example of the result is "metricCompression": "lz4", "longEncoding": "longs" }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs" + }, "maxPendingPersists": 0, "reportParseExceptions": false, "pushTimeout": 0, @@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...] |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShard [...] |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no| diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 4258fc9..874ae61 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -179,6 +179,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec tuningConfig.getPartitionsSpec(), tuningConfig.getShardSpecs(), tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getRowFlushBoundary(), tuningConfig.getMaxBytesInMemory(), tuningConfig.isLeaveIntermediate(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index 2104759..f23ee7b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -41,6 +41,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -62,6 +63,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -87,6 +89,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon dir, getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), @@ -112,6 +115,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index ef6259f..60c187f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -85,6 +85,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 8e4c6e9..27e9b29 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -53,6 +53,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @@ -80,6 +81,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -186,6 +188,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig getBasePersistDirectory(), getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index c9506d6..edf4c22 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2456,6 +2456,7 @@ public class KafkaIndexTaskTest null, null, null, + null, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 63b0e98..57b4f37 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Period; import org.junit.Assert; @@ -65,6 +66,7 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); } @@ -81,7 +83,9 @@ public class KafkaIndexTaskTuningConfigTest + " \"intermediatePersistPeriod\": \"PT1H\",\n" + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" - + " \"handoffConditionTimeout\": 100\n" + + " \"handoffConditionTimeout\": 100,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -103,6 +107,8 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } @Test @@ -117,6 +123,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -159,6 +166,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -206,6 +214,7 @@ public class KafkaIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -252,6 +261,7 @@ public class KafkaIndexTaskTuningConfigTest config.getBasePersistDirectory(), 0, config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), true, config.isReportParseExceptions(), config.getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 05242da..e8e46ad 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -213,6 +213,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, @@ -3051,6 +3052,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 3312a10..5859f90 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -63,6 +64,7 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); @@ -90,7 +92,9 @@ public class KafkaSupervisorTuningConfigTest + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" + " \"shutdownTimeout\": \"PT95S\",\n" - + " \"offsetFetchPeriod\": \"PT20S\"\n" + + " \"offsetFetchPeriod\": \"PT20S\",\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -116,6 +120,8 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 3cc124f..27e69e8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -45,6 +45,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -67,6 +68,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 534330e..f033a6d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -58,6 +58,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -85,6 +86,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -160,6 +162,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC dir, getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index ec72c7d..e921fc9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -96,6 +96,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 3c749be..144bb80 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -49,6 +49,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -81,6 +82,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, buildV9Directly, reportParseExceptions, handoffConditionTimeout, @@ -191,6 +193,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig getBasePersistDirectory(), getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index d8bf83c..bb83c08 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2629,6 +2629,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport null, null, null, + null, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 57ffebc..918cc2d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -144,6 +144,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -202,6 +203,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -288,6 +290,7 @@ public class KinesisIndexTaskTuningConfigTest new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e6bb39c..be5a87e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -168,6 +168,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, @@ -3678,6 +3679,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new File("/test"), null, null, + null, true, false, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 2485e97..e45168d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -45,6 +45,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @@ -73,6 +74,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, buildV9Directly, reportParseExceptions, handoffConditionTimeout, @@ -104,6 +106,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu base.getBasePersistDirectory(), base.getMaxPendingPersists(), base.getIndexSpec(), + base.getIndexSpecForIntermediatePersists(), base.getBuildV9Directly(), base.isReportParseExceptions(), base.getHandoffConditionTimeout(), diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index fc9a5d9..4aa8cd2 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -299,6 +299,11 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().getIndexSpec(); } + public IndexSpec getIndexSpecForIntermediatePersists() + { + return schema.getTuningConfig().getIndexSpecForIntermediatePersists(); + } + public boolean isOverwriteFiles() { return schema.getTuningConfig().isOverwriteFiles(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 5fd9b3d..e61f912 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -55,6 +55,7 @@ public class HadoopTuningConfig implements TuningConfig DEFAULT_PARTITIONS_SPEC, DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, + DEFAULT_INDEX_SPEC, DEFAULT_ROW_FLUSH_BOUNDARY, 0L, false, @@ -81,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig private final PartitionsSpec partitionsSpec; private final Map<Long, List<HadoopyShardSpec>> shardSpecs; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int rowFlushBoundary; private final long maxBytesInMemory; private final boolean leaveIntermediate; @@ -105,6 +107,7 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, + final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, @@ -132,6 +135,8 @@ public class HadoopTuningConfig implements TuningConfig this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; @@ -199,6 +204,12 @@ public class HadoopTuningConfig implements TuningConfig return indexSpec; } + @JsonProperty + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @JsonProperty("maxRowsInMemory") public int getRowFlushBoundary() { @@ -314,6 +325,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -343,6 +355,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -372,6 +385,7 @@ public class HadoopTuningConfig implements TuningConfig partitionsSpec, specs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 6c032b0..f4cad31 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -605,7 +605,7 @@ public class IndexGeneratorJob implements Jobby ) throws IOException { return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .persist(index, interval, file, config.getIndexSpec(), progressIndicator, null); + .persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null); } protected File mergeQueryableIndex( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 3fcf50a..d0d7180 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -462,6 +462,7 @@ public class BatchDeltaIngestionTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 59453fc..2649874 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -199,6 +199,7 @@ public class DetermineHashedPartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index e9265bc..3d8b06b 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -262,6 +262,7 @@ public class DeterminePartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 7573aea..9c723ac 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -86,6 +86,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + null, false, false, false, @@ -164,6 +165,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 2553583..ef29cb7 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -43,6 +43,7 @@ public class HadoopTuningConfigTest null, null, null, + null, 100, null, true, @@ -70,6 +71,7 @@ public class HadoopTuningConfigTest Assert.assertNotNull(actual.getPartitionsSpec()); Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists()); Assert.assertEquals(100, actual.getRowFlushBoundary()); Assert.assertEquals(true, actual.isLeaveIntermediate()); Assert.assertEquals(true, actual.isCleanupOnFailure()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 3925bcd..a7757ac 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -521,6 +521,7 @@ public class IndexGeneratorJobTest null, null, null, + null, maxRowsInMemory, maxBytesInMemory, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 3969e12..ff5aed6 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -114,6 +114,7 @@ public class JobHelperTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index 7f2ce22..4da9ee4 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -62,6 +62,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 9d2bed8..1676a2f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -63,6 +63,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long publishAndHandoffTimeout; private final long alertTimeout; @@ -84,6 +85,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @@ -106,6 +108,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; @@ -196,6 +200,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override @JsonProperty public boolean isReportParseExceptions() @@ -253,6 +264,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, publishAndHandoffTimeout, alertTimeout, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java index 6e0e1d8..f872042 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java @@ -236,7 +236,7 @@ public class YeOldePlumberSchool implements PlumberSchool indexMergerV9.persist( indexToPersist.getIndex(), dirToPersist, - config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 27df20a..e766eff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1270,6 +1270,7 @@ public class IndexTask extends AbstractTask implements ChatHandler private final Integer numShards; private final List<String> partitionDimensions; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final File basePersistDirectory; private final int maxPendingPersists; @@ -1305,6 +1306,7 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @@ -1327,6 +1329,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1346,7 +1349,7 @@ public class IndexTask extends AbstractTask implements ChatHandler private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1357,6 +1360,7 @@ public class IndexTask extends AbstractTask implements ChatHandler @Nullable Integer numShards, @Nullable List<String> partitionDimensions, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, @Nullable Integer maxPendingPersists, @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @@ -1384,6 +1388,8 @@ public class IndexTask extends AbstractTask implements ChatHandler this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null @@ -1420,6 +1426,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1442,6 +1449,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1464,6 +1472,7 @@ public class IndexTask extends AbstractTask implements ChatHandler numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1533,6 +1542,13 @@ public class IndexTask extends AbstractTask implements ChatHandler return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override public File getBasePersistDirectory() { @@ -1618,19 +1634,22 @@ public class IndexTask extends AbstractTask implements ChatHandler } IndexTuningConfig that = (IndexTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && - Objects.equals(maxTotalRows, that.maxTotalRows) && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(maxTotalRows, that.maxTotalRows) && Objects.equals(numShards, that.numShards) && + Objects.equals(partitionDimensions, that.partitionDimensions) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && - logParseExceptions == that.logParseExceptions && - maxParseExceptions == that.maxParseExceptions && - maxSavedParseExceptions == that.maxSavedParseExceptions; + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); } @Override @@ -1639,18 +1658,21 @@ public class IndexTask extends AbstractTask implements ChatHandler return Objects.hash( maxRowsPerSegment, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, + partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, basePersistDirectory, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + segmentWriteOutMediumFactory ); } @@ -1663,7 +1685,9 @@ public class IndexTask extends AbstractTask implements ChatHandler ", maxBytesInMemory=" + maxBytesInMemory + ", maxTotalRows=" + maxTotalRows + ", numShards=" + numShards + + ", partitionDimensions=" + partitionDimensions + ", indexSpec=" + indexSpec + + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + ", forceGuaranteedRollup=" + forceGuaranteedRollup + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2a29726..924be84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -333,6 +333,7 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan tuningConfig.getNumShards(), null, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), true, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 2c41549..9c480d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -71,6 +71,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig null, null, null, + null, null ); } @@ -84,6 +85,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -109,6 +111,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig numShards, null, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, null, forceGuaranteedRollup, @@ -188,7 +191,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig @Override public int hashCode() { - return Objects.hash( super.hashCode(), maxNumSubTasks, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index b594e42..1d48ad9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -47,6 +47,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Deprecated private final int maxPendingPersists; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -68,6 +69,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Nullable File basePersistDirectory, @Nullable Integer maxPendingPersists, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @Nullable Boolean reportParseExceptions, @@ -96,6 +98,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi this.basePersistDirectory = defaults.getBasePersistDirectory(); this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; @@ -187,6 +191,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -281,6 +292,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @@ -297,6 +309,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index c1d682e..d675069 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1416,6 +1416,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 69f10f6..bb69938 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -288,6 +288,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -462,6 +463,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -522,6 +524,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -582,6 +585,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -846,6 +850,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -1034,6 +1039,7 @@ public class CompactionTaskTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 97279f9..4bae4c8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1004,6 +1004,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, true, false, @@ -1126,6 +1127,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, false, false, @@ -1241,6 +1243,7 @@ public class IndexTaskTest null, indexSpec, null, + null, true, true, false, @@ -1706,6 +1709,7 @@ public class IndexTaskTest partitionDimensions, indexSpec, null, + null, true, forceGuaranteedRollup, reportParseException, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 6901543..daeeaab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -835,6 +835,7 @@ public class RealtimeIndexTaskTest null, null, null, + null, true, 0, 0, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index a09dd65..a58b295 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -202,6 +202,7 @@ public class TaskSerdeTest null, null, indexSpec, + null, 3, true, false, @@ -284,6 +285,7 @@ public class TaskSerdeTest null, null, indexSpec, + null, 3, true, false, @@ -393,6 +395,7 @@ public class TaskSerdeTest NoneShardSpec.instance(), indexSpec, null, + null, 0, 0, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 6f7c3b9..c7c8d0b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -188,6 +188,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 704c5fa..04aa5a7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index dd90963..0fa747f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -138,6 +138,7 @@ public class ParallelIndexSupervisorTaskSerdeTest null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index cfd02ac..cf7dd37 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -250,6 +250,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv null, null, null, + null, 1, null, null, @@ -290,6 +291,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index d587c6c..2d39302 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public class ParallelIndexTuningConfigTest CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + new IndexSpec(), 1, false, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6bdfbe4..d8c0af3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -690,6 +690,7 @@ public class TaskLifecycleTest null, null, indexSpec, + null, 3, true, false, @@ -771,6 +772,7 @@ public class TaskLifecycleTest null, null, indexSpec, + null, 3, true, false, @@ -1160,6 +1162,7 @@ public class TaskLifecycleTest null, indexSpec, null, + null, false, null, null, @@ -1290,6 +1293,7 @@ public class TaskLifecycleTest null, null, null, + null, 0, 0, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ae55fff..6381d0f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -661,6 +661,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 43be6e0..84edad1 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -82,6 +82,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, + defaultIndexSpec, true, 0, 0, @@ -103,6 +104,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int persistThreadPriority; private final int mergeThreadPriority; private final boolean reportParseExceptions; @@ -125,6 +127,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, @@ -152,6 +155,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; this.reportParseExceptions = reportParseExceptions == null @@ -233,6 +238,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -302,6 +314,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, @@ -326,6 +339,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 1e81792..2889a98 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -67,6 +67,8 @@ public interface AppenderatorConfig IndexSpec getIndexSpec(); + IndexSpec getIndexSpecForIntermediatePersists(); + File getBasePersistDirectory(); @Nullable diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 647340b..136b24e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -61,7 +61,6 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; @@ -1260,12 +1259,11 @@ public class AppenderatorImpl implements Appenderator final File persistedFile; final File persistDir = createPersistDirIfNeeded(identifier); - final IndexSpec indexSpec = tuningConfig.getIndexSpec(); persistedFile = indexMerger.persist( indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), - indexSpec, + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 8e0ad2b..7aa59f6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -53,7 +53,6 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -955,14 +954,12 @@ public class RealtimePlumber implements Plumber try { int numRows = indexToPersist.getIndex().size(); - final IndexSpec indexSpec = config.getIndexSpec(); - indexToPersist.getIndex().getMetadata().putAll(metadataElems); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - indexSpec, + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index cf57072..373c497 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.indexing; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Period; import org.junit.Assert; @@ -87,6 +88,7 @@ public class RealtimeTuningConfigTest Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -111,7 +113,9 @@ public class RealtimeTuningConfigTest + " \"mergeThreadPriority\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" - + " \"alertTimeout\": 70\n" + + " \"alertTimeout\": 70,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -128,7 +132,6 @@ public class RealtimeTuningConfigTest Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(70, config.getAlertTimeout()); - Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(100, config.getMaxPendingPersists()); @@ -137,5 +140,8 @@ public class RealtimeTuningConfigTest Assert.assertEquals(100, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 15a650b..ee79dc1 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -72,6 +72,7 @@ public class AppenderatorPlumberTest null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index af706dd..0d6cac5 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -160,6 +160,7 @@ public class AppenderatorTester implements AutoCloseable null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index e02ca6d..b92bd12 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -144,6 +144,7 @@ public class DefaultOfflineAppenderatorFactoryTest null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 4052cc6..3179030 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -206,6 +206,7 @@ public class RealtimePlumberSchoolTest null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index de55360..c1a4066 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -75,6 +75,7 @@ public class SinkTest null, null, null, + null, 0, 0, null, @@ -229,6 +230,7 @@ public class SinkTest null, null, null, + null, 0, 0, null, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index f9159fb..bb759b6 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -166,6 +166,7 @@ public class DruidJsonValidatorTest 1, NoneShardSpec.instance(), new IndexSpec(), + new IndexSpec(), null, 0, 0, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org