This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch 0.17.0-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.17.0-incubating by this push: new 5c4774e Using annotation to distinguish Hadoop Configuration in each module (#9013) (#9024) 5c4774e is described below commit 5c4774ea184becc57d1fc22018344a5d89205a5b Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Thu Dec 12 20:02:50 2019 -0800 Using annotation to distinguish Hadoop Configuration in each module (#9013) (#9024) * Multibinding for NodeRole * Fix endpoints * fix doc * fix test * Using annotation to distinguish Hadoop Configuration in each module --- .../druid/firehose/hdfs/HdfsFirehoseFactory.java | 3 +- .../src/main/java/org/apache/druid/guice/Hdfs.java | 40 ++++++++++++++++++++++ .../druid/inputsource/hdfs/HdfsInputSource.java | 3 +- .../druid/storage/hdfs/HdfsDataSegmentKiller.java | 3 +- .../druid/storage/hdfs/HdfsDataSegmentPuller.java | 3 +- .../druid/storage/hdfs/HdfsDataSegmentPusher.java | 7 +++- .../hdfs/HdfsFileTimestampVersionFinder.java | 3 +- .../storage/hdfs/HdfsStorageAuthentication.java | 3 +- .../druid/storage/hdfs/HdfsStorageDruidModule.java | 4 +-- .../druid/storage/hdfs/tasklog/HdfsTaskLogs.java | 3 +- .../druid/data/input/orc/OrcExtensionsModule.java | 3 +- .../druid/data/input/orc/OrcInputFormat.java | 3 +- .../org/apache/druid/data/input/orc/guice/Orc.java | 40 ++++++++++++++++++++++ .../input/parquet/ParquetExtensionsModule.java | 3 +- .../data/input/parquet/ParquetInputFormat.java | 10 ++++-- .../druid/data/input/parquet/ParquetReader.java | 6 +++- .../druid/data/input/parquet/guice/Parquet.java | 40 ++++++++++++++++++++++ .../data/input/parquet/BaseParquetReaderTest.java | 3 +- 18 files changed, 163 insertions(+), 17 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index 962da5e..ee5bf03 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; +import org.apache.druid.guice.Hdfs; import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; @@ -46,7 +47,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa @JsonCreator public HdfsFirehoseFactory( - @JacksonInject Configuration conf, + @JacksonInject @Hdfs Configuration conf, @JsonProperty("paths") Object inputPaths, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/guice/Hdfs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/guice/Hdfs.java new file mode 100644 index 0000000..40ab41b --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/guice/Hdfs.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the + * same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind + * different instances. This is a binding annotation for druid-hdfs-storage extension. Any binding for the same type + * should be distinguished by using this annotation. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Hdfs +{ +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 1409aa6..fe4a18b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -32,6 +32,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IAE; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -72,7 +73,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn @JsonCreator public HdfsInputSource( @JsonProperty(PROP_PATHS) Object inputPaths, - @JacksonInject Configuration configuration + @JacksonInject @Hdfs Configuration configuration ) { this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java index 2801677..08cd97f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs; import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.commons.lang.StringUtils; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -43,7 +44,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller private final Path storageDirectory; @Inject - public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig) + public HdfsDataSegmentKiller(@Hdfs final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig) { this.config = config; this.storageDirectory = new Path(pusherConfig.getStorageDirectory()); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index 1a526c0..342b82a 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs; import com.google.common.base.Predicate; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.RetryUtils; @@ -178,7 +179,7 @@ public class HdfsDataSegmentPuller implements URIDataPuller protected final Configuration config; @Inject - public HdfsDataSegmentPuller(final Configuration config) + public HdfsDataSegmentPuller(@Hdfs final Configuration config) { this.config = config; } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index 3fe756d..bde39d4 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -26,6 +26,7 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.druid.common.utils.UUIDUtils; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -60,7 +61,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final Supplier<String> fullyQualifiedStorageDirectory; @Inject - public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper) + public HdfsDataSegmentPusher( + HdfsDataSegmentPusherConfig config, + @Hdfs Configuration hadoopConfig, + ObjectMapper jsonMapper + ) { this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java index d0530d2..4daa94a 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java @@ -21,6 +21,7 @@ package org.apache.druid.storage.hdfs; import com.google.inject.Inject; import org.apache.druid.data.SearchableVersionedDataFinder; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.RetryUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -36,7 +37,7 @@ import java.util.regex.Pattern; public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder<URI> { @Inject - public HdfsFileTimestampVersionFinder(Configuration config) + public HdfsFileTimestampVersionFinder(@Hdfs Configuration config) { super(config); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java index 93fbf48..096875f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageAuthentication.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs; import com.google.common.base.Strings; import com.google.inject.Inject; +import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -40,7 +41,7 @@ public class HdfsStorageAuthentication private final Configuration hadoopConf; @Inject - public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, Configuration hadoopConf) + public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, @Hdfs Configuration hadoopConf) { this.hdfsKerberosConfig = hdfsKerberosConfig; this.hadoopConf = hadoopConf; diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 6e88f8e..d4242bc 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -28,6 +28,7 @@ import com.google.inject.multibindings.MapBinder; import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory; import org.apache.druid.guice.Binders; +import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; @@ -108,7 +109,7 @@ public class HdfsStorageDruidModule implements DruidModule } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf); JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class); Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class); @@ -117,6 +118,5 @@ public class HdfsStorageDruidModule implements DruidModule JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class); binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class); LifecycleModule.register(binder, HdfsStorageAuthentication.class); - } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 4e2b68f..7289901 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.inject.Inject; +import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogs; @@ -51,7 +52,7 @@ public class HdfsTaskLogs implements TaskLogs private final Configuration hadoopConfig; @Inject - public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig) + public HdfsTaskLogs(HdfsTaskLogsConfig config, @Hdfs Configuration hadoopConfig) { this.config = config; this.hadoopConfig = hadoopConfig; diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java index e57995b..78082cb 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; +import org.apache.druid.data.input.orc.guice.Orc; import org.apache.druid.initialization.DruidModule; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -89,6 +90,6 @@ public class OrcExtensionsModule implements DruidModule } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Orc.class).toInstance(conf); } } diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java index bf60fb9..1744fef 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.orc.guice.Orc; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.hadoop.conf.Configuration; @@ -42,7 +43,7 @@ public class OrcInputFormat extends NestedInputFormat public OrcInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString, - @JacksonInject Configuration conf + @JacksonInject @Orc Configuration conf ) { super(flattenSpec); diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java new file mode 100644 index 0000000..ff1be32 --- /dev/null +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/guice/Orc.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the + * same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind + * different instances. This is a binding annotation for druid-orc-extensions extension. Any binding for the same type + * should be distinguished by using this annotation. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Orc +{ +} diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java index dd2cf4c..17fe50d 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser; +import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser; import org.apache.druid.data.input.parquet.simple.ParquetParseSpec; import org.apache.druid.initialization.DruidModule; @@ -98,6 +99,6 @@ public class ParquetExtensionsModule implements DruidModule } } - binder.bind(Configuration.class).toInstance(conf); + binder.bind(Configuration.class).annotatedWith(Parquet.class).toInstance(conf); } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index fbd9341..c49581a 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -19,13 +19,16 @@ package org.apache.druid.data.input.parquet; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; import java.io.File; @@ -35,15 +38,18 @@ import java.util.Objects; public class ParquetInputFormat extends NestedInputFormat { private final boolean binaryAsString; + private final Configuration conf; @JsonCreator public ParquetInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, - @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString + @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString, + @JacksonInject @Parquet Configuration conf ) { super(flattenSpec); this.binaryAsString = binaryAsString == null ? false : binaryAsString; + this.conf = conf; } @JsonProperty @@ -65,7 +71,7 @@ public class ParquetInputFormat extends NestedInputFormat File temporaryDirectory ) throws IOException { - return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); + return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } @Override diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index a98f929..2c76253 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.example.GroupReadSupport; @@ -51,6 +52,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group> private final Closer closer; ParquetReader( + Configuration conf, InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory, @@ -69,7 +71,9 @@ public class ParquetReader extends IntermediateRowParsingReader<Group> final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build()); + reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path) + .withConf(conf) + .build()); } finally { Thread.currentThread().setContextClassLoader(currentClassLoader); diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java new file mode 100644 index 0000000..a6f46c6 --- /dev/null +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/guice/Parquet.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the + * same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind + * different instances. This is a binding annotation for druid-parquet-extensions extension. Any binding for the same + * type should be distinguished by using this annotation. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface Parquet +{ +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java index ac22f77..87f9229 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.FileEntity; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -51,7 +52,7 @@ class BaseParquetReaderTest ) throws IOException { FileEntity entity = new FileEntity(new File(parquetFile)); - ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString); + ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString, new Configuration()); return parquet.createReader(schema, entity, null); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org