[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r346617966 ## File path: core/src/main/java/org/apache/druid/data/input/Firehose.java ## @@ -74,13 +74,13 @@ * * @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException Review comment: javadoc for `@return` needs to be updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r346619475 ## File path: core/src/main/java/org/apache/druid/data/input/InputRowListPlusJson.java ## @@ -62,23 +95,19 @@ public InputRow getInputRow() } @Nullable - public ParseException getParseException() - { -return parseException; - } - - public boolean isEmpty() + public String getRawJson() { -return inputRow == null && raw == null && parseException == null; +return rawJson; } - public static InputRowPlusRaw of(@Nullable InputRow inputRow, @Nullable byte[] raw) + @Nullable + public ParseException getParseException() { -return new InputRowPlusRaw(inputRow, raw, null); +return parseException; } - public static InputRowPlusRaw of(@Nullable byte[] raw, @Nullable ParseException parseException) + public boolean isEmpty() { -return new InputRowPlusRaw(null, raw, parseException); +return (inputRows == null || inputRows.isEmpty()) && raw == null && rawJson == null && parseException == null; Review comment: Should this also check if `rawJson.isEmpty()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344919111 ## File path: core/src/main/java/org/apache/druid/data/input/Unformattable.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; + +import javax.annotation.Nullable; +import java.io.File; + +public interface Unformattable Review comment: Do you want to add javadocs (i.e., what does it mean to be "unformattable")? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344918837 ## File path: core/src/main/java/org/apache/druid/data/input/Formattable.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; + +import javax.annotation.Nullable; +import java.io.File; + +public interface Formattable Review comment: Do you want to add javadocs (i.e., what does it mean to be "formattable")? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344423212 ## File path: core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowPlusRaw; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.io.File; +import java.io.IOException; + +public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader +{ + private final FirehoseFactory firehoseFactory; + private final InputRowParser inputRowParser; + private final File temporaryDirectory; + + public FirehoseToInputSourceReaderAdaptor( + FirehoseFactory firehoseFactory, + InputRowParser inputRowPlusRaw, + File temporaryDirectory + ) + { +this.firehoseFactory = firehoseFactory; +this.inputRowParser = inputRowPlusRaw; +this.temporaryDirectory = temporaryDirectory; + } + + @Override + public CloseableIterator read() throws IOException + { +return new CloseableIterator() +{ + final Firehose firehose = firehoseFactory.connect(inputRowParser, temporaryDirectory); + + @Override + public boolean hasNext() + { +try { + return firehose.hasMore(); +} +catch (IOException e) { + throw new RuntimeException(e); Review comment: I believe it was created for use cases like conforming to the Iterator interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343346519 ## File path: core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java ## @@ -57,10 +58,10 @@ @JsonCreator public TimestampSpec( - @JsonProperty("column") String timestampColumn, - @JsonProperty("format") String format, + @JsonProperty("column") @Nullable String timestampColumn, Review comment: Thanks for adding! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343351095 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java ## @@ -294,7 +294,7 @@ public IndexMergerV9 getIndexMergerV9() return indexMergerV9; } - public File getFirehoseTemporaryDir() + public File getIndexingTmpDir() { return new File(taskWorkDir, "firehose"); Review comment: Perhaps rename the temporary directory as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343349303 ## File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java ## @@ -579,8 +579,9 @@ public void intoConfiguration(Job job) public void verify() { Preconditions.checkNotNull(schema.getDataSchema().getDataSource(), "dataSource"); +Preconditions.checkNotNull(schema.getDataSchema().getParser(), "inputRowParser"); Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), "parseSpec"); - Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec"); + Preconditions.checkNotNull(schema.getDataSchema().getNonNullTimestampSpec(), "timestampSpec"); Review comment: Checking this one for null seems redundant This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343377888 ## File path: server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java ## @@ -115,83 +158,75 @@ static void validateDatasourceName(String dataSource) Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character."); } + private static DimensionsSpec computeDimensionsSpec( + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators + ) + { +final Set dimensionExclusions = new HashSet<>(); + +final String timestampColumn = timestampSpec.getTimestampColumn(); +if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensionNames().contains(timestampColumn))) { + dimensionExclusions.add(timestampColumn); +} + +for (AggregatorFactory aggregator : aggregators) { + dimensionExclusions.addAll(aggregator.requiredFields()); + dimensionExclusions.add(aggregator.getName()); +} + +final Set metSet = Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet()); +final Set dimSet = new HashSet<>(dimensionsSpec.getDimensionNames()); +final Set overlap = Sets.intersection(metSet, dimSet); +if (!overlap.isEmpty()) { + throw new IAE( + "Cannot have overlapping dimensions and metrics of the same name. Please change the name of the metric. Overlap: %s", + overlap + ); +} + +return dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, dimSet)); + } + @JsonProperty public String getDataSource() { return dataSource; } - @JsonProperty("parser") - public Map getParserMap() + @Nullable + @JsonProperty + public TimestampSpec getTimestampSpec() Review comment: Suggestion: Make this private, since all clients of this class other than Jackson should be using `getNonNullTimestampSpec()`. Also, maybe rename this method to something like so that the API that's actually used has the nicer short name `getTimestampSpec()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343378240 ## File path: server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java ## @@ -115,83 +158,75 @@ static void validateDatasourceName(String dataSource) Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character."); } + private static DimensionsSpec computeDimensionsSpec( + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators + ) + { +final Set dimensionExclusions = new HashSet<>(); + +final String timestampColumn = timestampSpec.getTimestampColumn(); +if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensionNames().contains(timestampColumn))) { + dimensionExclusions.add(timestampColumn); +} + +for (AggregatorFactory aggregator : aggregators) { + dimensionExclusions.addAll(aggregator.requiredFields()); + dimensionExclusions.add(aggregator.getName()); +} + +final Set metSet = Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet()); +final Set dimSet = new HashSet<>(dimensionsSpec.getDimensionNames()); +final Set overlap = Sets.intersection(metSet, dimSet); +if (!overlap.isEmpty()) { + throw new IAE( + "Cannot have overlapping dimensions and metrics of the same name. Please change the name of the metric. Overlap: %s", + overlap + ); +} + +return dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, dimSet)); + } + @JsonProperty public String getDataSource() { return dataSource; } - @JsonProperty("parser") - public Map getParserMap() + @Nullable + @JsonProperty + public TimestampSpec getTimestampSpec() { -return parser; +return timestampSpec; } - @JsonIgnore - public InputRowParser getParser() + public TimestampSpec getNonNullTimestampSpec() { -if (parser == null) { - log.warn("No parser has been specified"); - return null; -} - -if (cachedParser != null) { - return cachedParser; -} - -final InputRowParser inputRowParser = transformSpec.decorate( -jsonMapper.convertValue(this.parser, InputRowParser.class) -); - -final Set dimensionExclusions = new HashSet<>(); -for (AggregatorFactory aggregator : aggregators) { - dimensionExclusions.addAll(aggregator.requiredFields()); - dimensionExclusions.add(aggregator.getName()); +if (timestampSpec == null) { + timestampSpec = Preconditions.checkNotNull(getParser(), "inputRowParser").getParseSpec().getTimestampSpec(); } +return timestampSpec; + } -if (inputRowParser.getParseSpec() != null) { - final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec(); - final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec(); - - // exclude timestamp from dimensions by default, unless explicitly included in the list of dimensions - if (timestampSpec != null) { -final String timestampColumn = timestampSpec.getTimestampColumn(); -if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensionNames().contains(timestampColumn))) { - dimensionExclusions.add(timestampColumn); -} - } - if (dimensionsSpec != null) { -final Set metSet = new HashSet<>(); -for (AggregatorFactory aggregator : aggregators) { - metSet.add(aggregator.getName()); -} -final Set dimSet = Sets.newHashSet(dimensionsSpec.getDimensionNames()); -final Set overlap = Sets.intersection(metSet, dimSet); -if (!overlap.isEmpty()) { - throw new IAE( - "Cannot have overlapping dimensions and metrics of the same name. Please change the name of the metric. Overlap: %s", - overlap - ); -} + @Nullable + @JsonProperty + public DimensionsSpec getDimensionsSpec() Review comment: Similar suggestion as `getTimestampSpec()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935241 ## File path: core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.FiniteFirehoseFactory; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.stream.Stream; + +public class NoopFirehoseFactory implements FiniteFirehoseFactory +{ + @Override + public String toString() + { +return "NoopFirehoseFactory{}"; + } + + @Override + public Stream getSplits( + @Nullable SplitHintSpec splitHintSpec + ) throws IOException + { +return null; + } + + @Override + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException Review comment: Can remove `throws IOException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343366132 ## File path: server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java ## @@ -53,38 +57,53 @@ private static final Logger log = new Logger(DataSchema.class); private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); private final String dataSource; - private final Map parser; private final AggregatorFactory[] aggregators; private final GranularitySpec granularitySpec; private final TransformSpec transformSpec; + private final Map parserMap; + private final ObjectMapper objectMapper; - private final ObjectMapper jsonMapper; + // The below fields can be initialized lazily from parser for backward compatibility. + private TimestampSpec timestampSpec; + private DimensionsSpec dimensionsSpec; - private InputRowParser cachedParser; + // This is used for backward compatibility + private InputRowParser inputRowParser; @JsonCreator public DataSchema( @JsonProperty("dataSource") String dataSource, - @JsonProperty("parser") Map parser, + @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // can be null in old task spec + @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, // can be null in old task spec @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("transformSpec") TransformSpec transformSpec, - @JacksonInject ObjectMapper jsonMapper + @Deprecated @JsonProperty("parser") @Nullable Map parserMap, + @JacksonInject ObjectMapper objectMapper ) { -this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper."); -this.parser = parser; -this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec; - validateDatasourceName(dataSource); this.dataSource = dataSource; +this.timestampSpec = timestampSpec; +this.dimensionsSpec = dimensionsSpec == null + ? null + : computeDimensionsSpec( + Preconditions.checkNotNull(timestampSpec, "timestampSpec"), Review comment: In the original code, it would allow a null `timestampSpec` and then just skip the logic to potentially add `timestampColumn` to `dimensionExclusions`. Why is `timestampSpec` required to not be null now instead of keeping the original behavior? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343921764 ## File path: core/src/main/java/org/apache/druid/data/input/SplitSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.base.Predicate; +import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * SplitSource abstracts an {@link InputSplit} and knows how to read bytes from the given split. + */ +@ExtensionPoint +public interface SplitSource +{ + Logger LOG = new Logger(SplitSource.class); + + int DEFAULT_FETCH_BUFFER_SIZE = 4 * 1024; // 4 KB + int DEFAULT_MAX_FETCH_RETRY = 2; // 3 tries including the initial try + + /** + * CleanableFile is the result type of {@link #fetch}. + * It should clean up any temporary resource on {@link #close()}. + */ + interface CleanableFile extends Closeable + { +File file(); + } + + InputSplit getSplit(); + + /** + * Opens an {@link InputStream} on the split directly. + * This is the basic way to read the given split. + * + * @see #fetch as an alternative way to read data. + */ + InputStream open() throws IOException; + + /** + * Fetches the split into the local storage. + * This method might be preferred instead of {@link #open()}, for example + * + * - {@link org.apache.druid.data.input.impl.InputFormat} requires expensive random access on remote storage. + * - Holding a connection until you consume the entire InputStream is expensive. + * + * @param temporaryDirectory to store temp data. This directory will be removed automatically once + * the task finishes. + * @param fetchBufferis used to fetch remote split into local storage. + * + * @see FileUtils#copyLarge + */ + default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { +final File tempFile = File.createTempFile("druid-split", ".tmp", temporaryDirectory); +LOG.debug("Fetching split into file[%s]", tempFile.getAbsolutePath()); +FileUtils.copyLarge( +open(), Review comment: This opens an `InputStream` that is not closed later. Since it only needs to be opened during the copy here, how about moving it into a try-with-resources? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343340884 ## File path: core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.impl.CSVParseSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputFormat; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; + +public class FirehoseFactoryToInputSourceAdaptorTest +{ + @Test + public void testUnimplementedInputFormat() throws IOException + { +final List lines = new ArrayList<>(); +for (int i = 0; i < 10; i++) { + lines.add(StringUtils.format("%d,name_%d,%d", 20190101 + i, i, i + 100)); +} +final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines); +final StringInputRowParser inputRowParser = new StringInputRowParser( +new TestCsvParseSpec( +new TimestampSpec(null, "MMdd", null), +new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))), +",", +Arrays.asList("timestamp", "name", "score"), +false, +0 +), +StringUtils.UTF8_STRING +); +final FirehoseFactoryToInputSourceAdaptor inputSourceAdaptor = new FirehoseFactoryToInputSourceAdaptor( +firehoseFactory, +inputRowParser +); +final InputSourceReader reader = inputSourceAdaptor.reader( +inputRowParser.getParseSpec().getTimestampSpec(), +inputRowParser.getParseSpec().getDimensionsSpec(), +null, +null +); +final List result = new ArrayList<>(); +try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { +result.add(iterator.next()); + } +} +Assert.assertEquals(10, result.size()); +for (int i = 0; i < 10; i++) { + Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", 1 + i)), result.get(i).getTimestamp()); + Assert.assertEquals( + StringUtils.format("name_%d", i), + Iterables.getOnlyElement(result.get(i).getDimension("name")) + ); + Assert.assertEquals( + StringUtils.format("%d", i + 100), + Iterables.getOnlyElement(result.get(i).getDimension("score")) + ); +} + } + + private static class TestCsvParseSpec extends CSVParseSpec Review comment: Suggestion: Rename the class to something like `UnimplementedInputFormatCsvParseSpec`. Currently, looking at just the body of `testUnimplementedInputFormat`, it's not apparent where the unimplemented input format is coming from. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343922643 ## File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser.Feature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +public class JsonInputFormat extends NestedInputFormat +{ + private final Map featureSpec; + private final ObjectMapper objectMapper; + + @JsonCreator + public JsonInputFormat( + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec, Review comment: This can be `@Nullable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r342782214 ## File path: core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FirehoseToInputSourceReaderAdaptor; +import org.apache.druid.data.input.impl.InputFormat; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.stream.Stream; + +public class FirehoseFactoryToInputSourceAdaptor implements SplittableInputSource +{ + private final FirehoseFactory firehoseFactory; + private final InputRowParser inputRowParser; + + public FirehoseFactoryToInputSourceAdaptor(FirehoseFactory firehoseFactory, InputRowParser inputRowParser) + { +this.firehoseFactory = firehoseFactory; +this.inputRowParser = Preconditions.checkNotNull(inputRowParser, "inputRowParser"); + } + + @Override + public boolean isSplittable() + { +return firehoseFactory.isSplittable(); + } + + @Override + public Stream createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + throws IOException + { +if (firehoseFactory.isSplittable()) { + return ((FiniteFirehoseFactory) firehoseFactory).getSplits(splitHintSpec); +} else { + throw new UnsupportedOperationException(); Review comment: Is supporting unsplittable `Firehose`s future work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343928317 ## File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.Objects; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class LocalInputSource implements SplittableInputSource +{ + private final File baseDir; + private final String filter; + + @JsonCreator + public LocalInputSource( + @JsonProperty("baseDir") File baseDir, + @JsonProperty("filter") String filter + ) + { +this.baseDir = baseDir; +this.filter = filter; + } + + @JsonProperty + public File getBaseDir() + { +return baseDir; + } + + @JsonProperty + public String getFilter() + { +return filter; + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { +return StreamSupport.stream(Spliterators.spliteratorUnknownSize(getFileIterator(), Spliterator.DISTINCT), false) +.map(InputSplit::new); + } + + @Override + public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { +final Iterator fileIterator = getFileIterator(); +int num = 0; +while (fileIterator.hasNext()) { + fileIterator.next(); + num++; +} Review comment: Can replace with [`Iterators#size`](https://guava.dev/releases/16.0/api/docs/com/google/common/collect/Iterators.html#size(java.util.Iterator)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935145 ## File path: core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.FiniteFirehoseFactory; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.stream.Stream; + +public class NoopFirehoseFactory implements FiniteFirehoseFactory +{ + @Override + public String toString() + { +return "NoopFirehoseFactory{}"; + } + + @Override Review comment: Can add `@Nullable` and remove `throws IOException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343419111 ## File path: core/src/main/java/org/apache/druid/data/input/impl/FileSource.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitSource; +import org.apache.druid.utils.CompressionUtils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; + +public class FileSource implements SplitSource +{ + private final InputSplit split; + private final FileChannel channel; + + FileSource(InputSplit split) throws FileNotFoundException + { +this.split = split; +final RandomAccessFile file = new RandomAccessFile(split.get(), "r"); Review comment: When does this opened file handle get closed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343942960 ## File path: core/src/main/java/org/apache/druid/java/util/common/FileUtils.java ## @@ -306,6 +306,33 @@ public void close() } } + public static long copyLarge( Review comment: Since this is almost the same as the method above, it'd be good to factor out the common parts to a private helper method. Also, since it's a library function used in several places, it would be good to add a unit test to `FileUtilsTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343881013 ## File path: core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class CsvInputFormat implements InputFormat +{ + private final String listDelimiter; + private final List columns; + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + + @JsonCreator + public CsvInputFormat( + @JsonProperty("columns") @Nullable List columns, + @JsonProperty("listDelimiter") String listDelimiter, + @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, + @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, Review comment: Does this need to be `@Nullable`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935199 ## File path: core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.FiniteFirehoseFactory; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.stream.Stream; + +public class NoopFirehoseFactory implements FiniteFirehoseFactory +{ + @Override + public String toString() + { +return "NoopFirehoseFactory{}"; + } + + @Override + public Stream getSplits( + @Nullable SplitHintSpec splitHintSpec + ) throws IOException + { +return null; + } + + @Override + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException + { +return 0; + } + + @Override Review comment: Can add `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343410250 ## File path: core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java ## @@ -73,6 +74,13 @@ public void verify(List usedCols) return new JSONPathParser(getFlattenSpec(), objectMapper); } + @Nullable Review comment: Can remove `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343936612 ## File path: core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class JsonReaderTest +{ + @Test + public void testParseRow() throws IOException + { +final JsonInputFormat format = new JsonInputFormat( +new JSONPathSpec( +true, +ImmutableList.of( +new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), +new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), +new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), +new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") +) +), +null +); + +final ByteSource source = new ByteSource( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}") +); + +final SplitReader reader = format.createReader( +new TimestampSpec("timestamp", "iso", null), +new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))) +); +try (CloseableIterator iterator = reader.read(source, null)) { + while (iterator.hasNext()) { +final InputRow row = iterator.next(); +Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); +Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); +Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); +Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); +Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); +Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + +Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); +Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); +Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + } +} Review comment: Also assert that the expected number of 1 iteration occurred This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343932828 ## File path: core/src/main/java/org/apache/druid/data/input/impl/SplitIteratingReader.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowPlusRaw; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.data.input.SplitSource; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * InputSourceReader iterating multiple {@link SplitSource}s. + */ +public class SplitIteratingReader implements InputSourceReader +{ + private final TimestampSpec timestampSpec; + private final DimensionsSpec dimensionsSpec; + private final InputFormat inputFormat; + private final Iterator> sourceIterator; + private final File temporaryDirectory; + + public SplitIteratingReader( + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + InputFormat inputFormat, + Stream> sourceStream, + File temporaryDirectory + ) + { +this.timestampSpec = timestampSpec; +this.dimensionsSpec = dimensionsSpec; +this.inputFormat = inputFormat; +this.sourceIterator = sourceStream.iterator(); +this.temporaryDirectory = temporaryDirectory; + } + + @Override + public CloseableIterator read() + { +return createIterator(reader -> { + try { +return reader.read(sourceIterator.next(), temporaryDirectory); + } + catch (IOException e) { +throw new RuntimeException(e); + } +}); + } + + @Override + public CloseableIterator sample() + { +return createIterator(reader -> { + try { +return reader.sample(sourceIterator.next(), temporaryDirectory); + } + catch (IOException e) { +throw new RuntimeException(e); + } +}); + } + + private CloseableIterator createIterator(Function> rowPopulator) + { +return new CloseableIterator() +{ + CloseableIterator rowIterator = null; + + @Override + public boolean hasNext() + { +checkRowIterator(); +return rowIterator != null && rowIterator.hasNext(); + } + + @Override + public R next() + { +if (!hasNext()) { + throw new NoSuchElementException(); +} +return rowIterator.next(); + } + + private void checkRowIterator() Review comment: Consider renaming method to something that indicates `rowIterator` may be mutated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343948330 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java ## @@ -1015,6 +1044,16 @@ public IndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("parser", dataSchema.getParserMap()), + new Property<>("inputFormat", ioConfig.getInputFormat()) + ) + ); + if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { +throw new IAE("Cannot use parser and inputSource together. Try use inputFormat instead of parser."); Review comment: Typo: Try use inputFormat -> Try using inputFormat This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343908366 ## File path: core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class CsvReaderTest +{ + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); + private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name")) + ); + + @Test + public void testWithoutHeaders() throws IOException + { +final ByteSource source = writeData( +ImmutableList.of( +"2019-01-01T00:00:10Z,name_1,5", +"2019-01-01T00:00:20Z,name_2,10", +"2019-01-01T00:00:30Z,name_3,15" +) +); +final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0); +assertResult(source, format); + } + + @Test + public void testFindColumn() throws IOException + { +final ByteSource source = writeData( +ImmutableList.of( +"ts,name,score", +"2019-01-01T00:00:10Z,name_1,5", +"2019-01-01T00:00:20Z,name_2,10", +"2019-01-01T00:00:30Z,name_3,15" +) +); +final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 0); +assertResult(source, format); + } + + @Test + public void testSkipHeaders() throws IOException + { +final ByteSource source = writeData( +ImmutableList.of( +"this,is,a,row,to,skip", +"2019-01-01T00:00:10Z,name_1,5", +"2019-01-01T00:00:20Z,name_2,10", +"2019-01-01T00:00:30Z,name_3,15" +) +); +final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1); +assertResult(source, format); + } + + @Test + public void testFindColumnAndSkipHeaders() throws IOException Review comment: This test is related to the behavior change comment in `CsvReader`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343425624 ## File path: core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowPlusRaw; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.io.File; +import java.io.IOException; + +public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader +{ + private final FirehoseFactory firehoseFactory; + private final InputRowParser inputRowParser; + private final File temporaryDirectory; + + public FirehoseToInputSourceReaderAdaptor( + FirehoseFactory firehoseFactory, + InputRowParser inputRowPlusRaw, + File temporaryDirectory + ) + { +this.firehoseFactory = firehoseFactory; +this.inputRowParser = inputRowPlusRaw; +this.temporaryDirectory = temporaryDirectory; + } + + @Override + public CloseableIterator read() throws IOException + { +return new CloseableIterator() +{ + final Firehose firehose = firehoseFactory.connect(inputRowParser, temporaryDirectory); + + @Override + public boolean hasNext() + { +try { + return firehose.hasMore(); +} +catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public InputRow next() + { +try { + return firehose.nextRow(); +} +catch (IOException e) { + throw new RuntimeException(e); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343414360 ## File path: core/src/main/java/org/apache/druid/data/input/SplitSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.base.Predicate; +import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * SplitSource abstracts an {@link InputSplit} and knows how to read bytes from the given split. + */ +@ExtensionPoint +public interface SplitSource +{ + Logger LOG = new Logger(SplitSource.class); + + int DEFAULT_FETCH_BUFFER_SIZE = 4 * 1024; // 4 KB Review comment: Unused? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343380177 ## File path: server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java ## @@ -212,25 +247,78 @@ public TransformSpec getTransformSpec() return transformSpec; } + @Deprecated + @JsonProperty("parser") + @Nullable + @JsonInclude(Include.NON_NULL) + public Map getParserMap() + { +return parserMap; + } + + @Nullable + public InputRowParser getParser() + { +if (inputRowParser == null) { + if (parserMap == null) { +return null; + } + //noinspection unchecked + inputRowParser = transformSpec.decorate(objectMapper.convertValue(this.parserMap, InputRowParser.class)); + ParseSpec parseSpec = inputRowParser.getParseSpec(); + parseSpec = parseSpec.withDimensionsSpec( + computeDimensionsSpec(parseSpec.getTimestampSpec(), parseSpec.getDimensionsSpec(), aggregators) + ); + if (timestampSpec != null) { +parseSpec = parseSpec.withTimestampSpec(timestampSpec); + } + if (dimensionsSpec != null) { +parseSpec = parseSpec.withDimensionsSpec(dimensionsSpec); + } + inputRowParser = inputRowParser.withParseSpec(parseSpec); +} +return inputRowParser; + } + public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { -return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper); +return new DataSchema( +dataSource, +timestampSpec, +dimensionsSpec, +aggregators, +granularitySpec, +transformSpec, +parserMap, +objectMapper +); } public DataSchema withTransformSpec(TransformSpec transformSpec) { -return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper); +return new DataSchema( +dataSource, +timestampSpec, +dimensionsSpec, +aggregators, +granularitySpec, +transformSpec, +parserMap, +objectMapper +); } @Override public String toString() { return "DataSchema{" + "dataSource='" + dataSource + '\'' + - ", parser=" + parser + ", aggregators=" + Arrays.toString(aggregators) + ", granularitySpec=" + granularitySpec + ", transformSpec=" + transformSpec + + (parserMap == null ? "" : ", parserMap=" + parserMap) + Review comment: Why not print `parserMap=null` if it's null? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343956499 ## File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java ## @@ -90,24 +91,29 @@ @RunWith(Parameterized.class) public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest { - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { return ImmutableList.of( -new Object[]{LockGranularity.TIME_CHUNK}, -new Object[]{LockGranularity.SEGMENT} +new Object[]{LockGranularity.TIME_CHUNK, false}, +new Object[]{LockGranularity.TIME_CHUNK, true}, +new Object[]{LockGranularity.SEGMENT, false}, Review comment: Similar comment to `IndexingTest` about skipping this permutation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343425515 ## File path: core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowPlusRaw; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.io.File; +import java.io.IOException; + +public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader +{ + private final FirehoseFactory firehoseFactory; + private final InputRowParser inputRowParser; + private final File temporaryDirectory; + + public FirehoseToInputSourceReaderAdaptor( + FirehoseFactory firehoseFactory, + InputRowParser inputRowPlusRaw, + File temporaryDirectory + ) + { +this.firehoseFactory = firehoseFactory; +this.inputRowParser = inputRowPlusRaw; +this.temporaryDirectory = temporaryDirectory; + } + + @Override + public CloseableIterator read() throws IOException + { +return new CloseableIterator() +{ + final Firehose firehose = firehoseFactory.connect(inputRowParser, temporaryDirectory); + + @Override + public boolean hasNext() + { +try { + return firehose.hasMore(); +} +catch (IOException e) { + throw new RuntimeException(e); Review comment: Maybe use `UncheckedIOException` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343906675 ## File path: core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.opencsv.CSVParser; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.TextReader; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.common.parsers.ParserUtils; +import org.apache.druid.java.util.common.parsers.Parsers; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class CsvReader extends TextReader +{ + private final CSVParser parser = new CSVParser(); + private final boolean hasHeaderRow; + private final int skipHeaderRows; + private final Function multiValueFunction; + @Nullable + private List columns; + + CsvReader( + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec, + String listDelimiter, + @Nullable List columns, + boolean hasHeaderRow, + int skipHeaderRows + ) + { +super(timestampSpec, dimensionsSpec); +this.hasHeaderRow = hasHeaderRow; +this.skipHeaderRows = skipHeaderRows; +final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; +this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); +this.columns = hasHeaderRow ? null : columns; // columns will be overriden by header row + +if (this.columns != null) { + for (String column : this.columns) { +Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(this.columns, dimensionsSpec.getDimensionNames()); +} else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); +} + } + + @Override + public InputRow readLine(String line) throws IOException, ParseException + { +final String[] parsed = parser.parseLine(line); +final Map zipped = Utils.zipMapPartial( +Preconditions.checkNotNull(columns, "columns"), +Iterables.transform(Arrays.asList(parsed), multiValueFunction) +); +return MapInputRowParser.parse(getTimestampSpec(), getDimensionsSpec(), zipped); + } + + @Override + public int getNumHeaderLines() + { +return (hasHeaderRow ? 1 : 0) + skipHeaderRows; Review comment: Is this an intentional behavior change? From the current [docs](https://druid.apache.org/docs/latest/ingestion/data-formats.html): > Also, you can skip some header rows by setting skipHeaderRows in your parseSpec. If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column information from the third line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343937356 ## File path: core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class JsonReaderTest +{ + @Test + public void testParseRow() throws IOException + { +final JsonInputFormat format = new JsonInputFormat( +new JSONPathSpec( +true, +ImmutableList.of( +new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), +new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), +new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), +new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") +) +), +null +); + +final ByteSource source = new ByteSource( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}") +); + +final SplitReader reader = format.createReader( +new TimestampSpec("timestamp", "iso", null), +new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))) +); +try (CloseableIterator iterator = reader.read(source, null)) { + while (iterator.hasNext()) { +final InputRow row = iterator.next(); +Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); +Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); +Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); +Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); +Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); +Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + +Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); +Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); +Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + } +} + } + + @Test + public void testParseRowWithConditional() throws IOException + { +final JsonInputFormat format = new JsonInputFormat( +new JSONPathSpec( +true, +ImmutableList.of( +new JSONPathFieldSpec(JSONPathFieldType.PATH, "foo", "$.[?(@.maybe_object)].maybe_object.foo.test"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "baz", "$.maybe_object_2.foo.test"), +new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar", "$.[?(@.something_else)].something_else.foo") +) +), +null +); + +final ByteSource source = new ByteSource( +StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"something_else\": {\"foo\": \"test\"}}") +); + +final SplitReader reader = format.createReader( +new TimestampSpec("timestamp", "iso", null), +new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))) +); + +try (CloseableIterator iterator = reader.read(source, null)) { + while
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343924073 ## File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser.Feature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.SplitReader; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +public class JsonInputFormat extends NestedInputFormat +{ + private final Map featureSpec; + private final ObjectMapper objectMapper; + + @JsonCreator + public JsonInputFormat( + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec, + @JsonProperty("featureSpec") @Nullable Map featureSpec Review comment: When the docs are updated, I noticed a description of `featureSpec` is missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343950067 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java ## @@ -39,6 +44,18 @@ public ParallelIndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); +Checks.checkOneNotNullOrEmpty( +ImmutableList.of( +new Property<>("parser", dataSchema.getParserMap()), +new Property<>("inputFormat", ioConfig.getInputFormat()) +) +); +if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { + if (!(ioConfig.getInputSource() instanceof FirehoseFactoryToInputSourceAdaptor)) { +throw new IAE("Cannot use parser and inputSource together. Try use inputFormat instead of parser."); Review comment: Typo: Try use inputFormat -> Try using inputFormat This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343956969 ## File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java ## @@ -67,21 +68,26 @@ @RunWith(Parameterized.class) public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest { - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { return ImmutableList.of( -new Object[]{LockGranularity.TIME_CHUNK}, -new Object[]{LockGranularity.SEGMENT} +new Object[]{LockGranularity.TIME_CHUNK, false}, +new Object[]{LockGranularity.TIME_CHUNK, true}, +new Object[]{LockGranularity.SEGMENT, false}, Review comment: Similar comment to `IndexingTest` about skipping this permutation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343948966 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java ## @@ -1043,30 +1082,90 @@ public IndexTuningConfig getTuningConfig() } @JsonTypeName("index") - public static class IndexIOConfig implements IOConfig + public static class IndexIOConfig implements BatchIOConfig { private static final boolean DEFAULT_APPEND_TO_EXISTING = false; private final FirehoseFactory firehoseFactory; +private final InputSource inputSource; +private final InputFormat inputFormat; private final boolean appendToExisting; @JsonCreator public IndexIOConfig( -@JsonProperty("firehose") FirehoseFactory firehoseFactory, +@Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory, +@JsonProperty("inputSource") @Nullable InputSource inputSource, +@JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting ) { + Checks.checkOneNotNullOrEmpty( + ImmutableList.of(new Property<>("firehose", firehoseFactory), new Property<>("inputSource", inputSource)) + ); + if (firehoseFactory != null && inputFormat != null) { +throw new IAE("Cannot use firehose and inputFormat together. Try use inputSource instead of firehose."); Review comment: Typo: Try use inputFormat -> Try using inputSource This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343940070 ## File path: core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +public class CsvInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSerde() throws IOException + { +final ObjectMapper mapper = new ObjectMapper(); +final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", true, 10); +final byte[] bytes = mapper.writeValueAsBytes(format); +final CsvInputFormat fromJson = (CsvInputFormat) mapper.readValue(bytes, InputFormat.class); +Assert.assertEquals(format, fromJson); + } + + @Test + public void testColumnMissing() + { +final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), ",", false, 0); +expectedException.expect(IllegalArgumentException.class); +expectedException.expectMessage("column[b] not in columns"); +format.createReader( +new TimestampSpec("timestamp", "auto", null), +new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("a", "b"))) +); + } + + @Test + public void testComma() Review comment: I think it's better to test that it fails if one of the columns names defined in the `CsvInputFormat` has a comma (i.e., error message: `Column[%s] has a comma, it cannot`). Otherwise, it's testing the same behavior as `testColumnMissing()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343934796 ## File path: core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; + +import javax.annotation.Nullable; +import java.io.File; + +public class NoopInputSource implements InputSource +{ + @Override Review comment: Can add `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343955244 ## File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java ## @@ -115,47 +118,49 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); + private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")) + ); private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - new ArrayList<>(), - new ArrayList<>() - ), + DEFAULT_TIMESTAMP_SPEC, + DEFAULT_DIMENSIONS_SPEC, null, Arrays.asList("ts", "dim", "val"), false, 0 ); + private static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat(); - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { return ImmutableList.of( -new Object[]{LockGranularity.TIME_CHUNK}, -new Object[]{LockGranularity.SEGMENT} +new Object[]{LockGranularity.TIME_CHUNK, false}, +new Object[]{LockGranularity.TIME_CHUNK, true}, +new Object[]{LockGranularity.SEGMENT, false}, +new Object[]{LockGranularity.SEGMENT, true} Review comment: This is a relatively slow test (~15 seconds per parameterized run), so all the permutations may be overkill. Perhaps remove (SEGMENT, false), which will still give coverage of both lock granularities and both with/without the input format API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces
ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343934419 ## File path: core/src/test/java/org/apache/druid/data/input/impl/SplitIteratingReaderTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +public class SplitIteratingReaderTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void test() throws IOException + { +final List files = new ArrayList<>(); +for (int i = 0; i < 5; i++) { + final File file = temporaryFolder.newFile("test_" + i); + files.add(file); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { +writer.write(StringUtils.format("%d,%s,%d\n", 20190101 + i, "name_" + i, i)); +writer.write(StringUtils.format("%d,%s,%d", 20190102 + i, "name_" + (i + 1), i + 1)); + } +} +final SplitIteratingReader firehose = new SplitIteratingReader<>( +new TimestampSpec("time", "MMdd", null), +new DimensionsSpec( +DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", "score")) +), +new CsvInputFormat( +ImmutableList.of("time", "name", "score"), +null, +false, +0 +), +files.stream().flatMap(file -> { + try { +return ImmutableList.of(new FileSource(new InputSplit<>(file))).stream(); + } + catch (IOException e) { +throw new RuntimeException(e); + } +}), +temporaryFolder.newFolder() +); + +try (CloseableIterator iterator = firehose.read()) { + int i = 0; + while (iterator.hasNext()) { +InputRow row = iterator.next(); +Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i + 1)), row.getTimestamp()); +Assert.assertEquals(StringUtils.format("name_%d", i), Iterables.getOnlyElement(row.getDimension("name"))); +Assert.assertEquals(Integer.toString(i), Iterables.getOnlyElement(row.getDimension("score"))); + +Assert.assertTrue(iterator.hasNext()); +row = iterator.next(); +Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i + 2)), row.getTimestamp()); +Assert.assertEquals(StringUtils.format("name_%d", i + 1), Iterables.getOnlyElement(row.getDimension("name"))); +Assert.assertEquals(Integer.toString(i + 1), Iterables.getOnlyElement(row.getDimension("score"))); +i++; + } +} Review comment: Can assert `i` is 5 here. Ideally, also add a named constant for the number of files that's used for both the setup and the assert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org