[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-14 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r346617966
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/Firehose.java
 ##
 @@ -74,13 +74,13 @@
*
* @return an InputRowPlusRaw which may contain any of: an InputRow, the raw 
data, or a ParseException
 
 Review comment:
   javadoc for `@return` needs to be updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-14 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r346619475
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/InputRowListPlusJson.java
 ##
 @@ -62,23 +95,19 @@ public InputRow getInputRow()
   }
 
   @Nullable
-  public ParseException getParseException()
-  {
-return parseException;
-  }
-
-  public boolean isEmpty()
+  public String getRawJson()
   {
-return inputRow == null && raw == null && parseException == null;
+return rawJson;
   }
 
-  public static InputRowPlusRaw of(@Nullable InputRow inputRow, @Nullable 
byte[] raw)
+  @Nullable
+  public ParseException getParseException()
   {
-return new InputRowPlusRaw(inputRow, raw, null);
+return parseException;
   }
 
-  public static InputRowPlusRaw of(@Nullable byte[] raw, @Nullable 
ParseException parseException)
+  public boolean isEmpty()
   {
-return new InputRowPlusRaw(null, raw, parseException);
+return (inputRows == null || inputRows.isEmpty()) && raw == null && 
rawJson == null && parseException == null;
 
 Review comment:
   Should this also check if `rawJson.isEmpty()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-11 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344919111
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/Unformattable.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+public interface Unformattable
 
 Review comment:
   Do you want to add javadocs (i.e., what does it mean to be "unformattable")?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-11 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344918837
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/Formattable.java
 ##
 @@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+public interface Formattable
 
 Review comment:
   Do you want to add javadocs (i.e., what does it mean to be "formattable")?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-08 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r344423212
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+
+public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader
+{
+  private final FirehoseFactory firehoseFactory;
+  private final InputRowParser inputRowParser;
+  private final File temporaryDirectory;
+
+  public FirehoseToInputSourceReaderAdaptor(
+  FirehoseFactory firehoseFactory,
+  InputRowParser inputRowPlusRaw,
+  File temporaryDirectory
+  )
+  {
+this.firehoseFactory = firehoseFactory;
+this.inputRowParser = inputRowPlusRaw;
+this.temporaryDirectory = temporaryDirectory;
+  }
+
+  @Override
+  public CloseableIterator read() throws IOException
+  {
+return new CloseableIterator()
+{
+  final Firehose firehose = firehoseFactory.connect(inputRowParser, 
temporaryDirectory);
+
+  @Override
+  public boolean hasNext()
+  {
+try {
+  return firehose.hasMore();
+}
+catch (IOException e) {
+  throw new RuntimeException(e);
 
 Review comment:
   I believe it was created for use cases like conforming to the Iterator 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343346519
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
 ##
 @@ -57,10 +58,10 @@
 
   @JsonCreator
   public TimestampSpec(
-  @JsonProperty("column") String timestampColumn,
-  @JsonProperty("format") String format,
+  @JsonProperty("column") @Nullable String timestampColumn,
 
 Review comment:
   Thanks for adding!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343351095
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
 ##
 @@ -294,7 +294,7 @@ public IndexMergerV9 getIndexMergerV9()
 return indexMergerV9;
   }
 
-  public File getFirehoseTemporaryDir()
+  public File getIndexingTmpDir()
   {
 return new File(taskWorkDir, "firehose");
 
 Review comment:
   Perhaps rename the temporary directory as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343349303
 
 

 ##
 File path: 
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
 ##
 @@ -579,8 +579,9 @@ public void intoConfiguration(Job job)
   public void verify()
   {
 Preconditions.checkNotNull(schema.getDataSchema().getDataSource(), 
"dataSource");
+Preconditions.checkNotNull(schema.getDataSchema().getParser(), 
"inputRowParser");
 
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), 
"parseSpec");
-
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(),
 "timestampSpec");
+
Preconditions.checkNotNull(schema.getDataSchema().getNonNullTimestampSpec(), 
"timestampSpec");
 
 Review comment:
   Checking this one for null seems redundant


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343377888
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
 ##
 @@ -115,83 +158,75 @@ static void validateDatasourceName(String dataSource)
 Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot 
contain the '/' character.");
   }
 
+  private static DimensionsSpec computeDimensionsSpec(
+  TimestampSpec timestampSpec,
+  DimensionsSpec dimensionsSpec,
+  AggregatorFactory[] aggregators
+  )
+  {
+final Set dimensionExclusions = new HashSet<>();
+
+final String timestampColumn = timestampSpec.getTimestampColumn();
+if (!(dimensionsSpec.hasCustomDimensions() && 
dimensionsSpec.getDimensionNames().contains(timestampColumn))) {
+  dimensionExclusions.add(timestampColumn);
+}
+
+for (AggregatorFactory aggregator : aggregators) {
+  dimensionExclusions.addAll(aggregator.requiredFields());
+  dimensionExclusions.add(aggregator.getName());
+}
+
+final Set metSet = 
Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet());
+final Set dimSet = new 
HashSet<>(dimensionsSpec.getDimensionNames());
+final Set overlap = Sets.intersection(metSet, dimSet);
+if (!overlap.isEmpty()) {
+  throw new IAE(
+  "Cannot have overlapping dimensions and metrics of the same name. 
Please change the name of the metric. Overlap: %s",
+  overlap
+  );
+}
+
+return 
dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, 
dimSet));
+  }
+
   @JsonProperty
   public String getDataSource()
   {
 return dataSource;
   }
 
-  @JsonProperty("parser")
-  public Map getParserMap()
+  @Nullable
+  @JsonProperty
+  public TimestampSpec getTimestampSpec()
 
 Review comment:
   Suggestion: Make this private, since all clients of this class other than 
Jackson should be using `getNonNullTimestampSpec()`. Also, maybe rename this 
method to something like so that the API that's actually used has the nicer 
short name `getTimestampSpec()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343378240
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
 ##
 @@ -115,83 +158,75 @@ static void validateDatasourceName(String dataSource)
 Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot 
contain the '/' character.");
   }
 
+  private static DimensionsSpec computeDimensionsSpec(
+  TimestampSpec timestampSpec,
+  DimensionsSpec dimensionsSpec,
+  AggregatorFactory[] aggregators
+  )
+  {
+final Set dimensionExclusions = new HashSet<>();
+
+final String timestampColumn = timestampSpec.getTimestampColumn();
+if (!(dimensionsSpec.hasCustomDimensions() && 
dimensionsSpec.getDimensionNames().contains(timestampColumn))) {
+  dimensionExclusions.add(timestampColumn);
+}
+
+for (AggregatorFactory aggregator : aggregators) {
+  dimensionExclusions.addAll(aggregator.requiredFields());
+  dimensionExclusions.add(aggregator.getName());
+}
+
+final Set metSet = 
Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet());
+final Set dimSet = new 
HashSet<>(dimensionsSpec.getDimensionNames());
+final Set overlap = Sets.intersection(metSet, dimSet);
+if (!overlap.isEmpty()) {
+  throw new IAE(
+  "Cannot have overlapping dimensions and metrics of the same name. 
Please change the name of the metric. Overlap: %s",
+  overlap
+  );
+}
+
+return 
dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, 
dimSet));
+  }
+
   @JsonProperty
   public String getDataSource()
   {
 return dataSource;
   }
 
-  @JsonProperty("parser")
-  public Map getParserMap()
+  @Nullable
+  @JsonProperty
+  public TimestampSpec getTimestampSpec()
   {
-return parser;
+return timestampSpec;
   }
 
-  @JsonIgnore
-  public InputRowParser getParser()
+  public TimestampSpec getNonNullTimestampSpec()
   {
-if (parser == null) {
-  log.warn("No parser has been specified");
-  return null;
-}
-
-if (cachedParser != null) {
-  return cachedParser;
-}
-
-final InputRowParser inputRowParser = transformSpec.decorate(
-jsonMapper.convertValue(this.parser, InputRowParser.class)
-);
-
-final Set dimensionExclusions = new HashSet<>();
-for (AggregatorFactory aggregator : aggregators) {
-  dimensionExclusions.addAll(aggregator.requiredFields());
-  dimensionExclusions.add(aggregator.getName());
+if (timestampSpec == null) {
+  timestampSpec = Preconditions.checkNotNull(getParser(), 
"inputRowParser").getParseSpec().getTimestampSpec();
 }
+return timestampSpec;
+  }
 
-if (inputRowParser.getParseSpec() != null) {
-  final DimensionsSpec dimensionsSpec = 
inputRowParser.getParseSpec().getDimensionsSpec();
-  final TimestampSpec timestampSpec = 
inputRowParser.getParseSpec().getTimestampSpec();
-
-  // exclude timestamp from dimensions by default, unless explicitly 
included in the list of dimensions
-  if (timestampSpec != null) {
-final String timestampColumn = timestampSpec.getTimestampColumn();
-if (!(dimensionsSpec.hasCustomDimensions() && 
dimensionsSpec.getDimensionNames().contains(timestampColumn))) {
-  dimensionExclusions.add(timestampColumn);
-}
-  }
-  if (dimensionsSpec != null) {
-final Set metSet = new HashSet<>();
-for (AggregatorFactory aggregator : aggregators) {
-  metSet.add(aggregator.getName());
-}
-final Set dimSet = 
Sets.newHashSet(dimensionsSpec.getDimensionNames());
-final Set overlap = Sets.intersection(metSet, dimSet);
-if (!overlap.isEmpty()) {
-  throw new IAE(
-  "Cannot have overlapping dimensions and metrics of the same 
name. Please change the name of the metric. Overlap: %s",
-  overlap
-  );
-}
+  @Nullable
+  @JsonProperty
+  public DimensionsSpec getDimensionsSpec()
 
 Review comment:
   Similar suggestion as `getTimestampSpec()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935241
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.FiniteFirehoseFactory;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class NoopFirehoseFactory implements FiniteFirehoseFactory
+{
+  @Override
+  public String toString()
+  {
+return "NoopFirehoseFactory{}";
+  }
+
+  @Override
+  public Stream getSplits(
+  @Nullable SplitHintSpec splitHintSpec
+  ) throws IOException
+  {
+return null;
+  }
+
+  @Override
+  public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws 
IOException
 
 Review comment:
   Can remove `throws IOException`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343366132
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
 ##
 @@ -53,38 +57,53 @@
   private static final Logger log = new Logger(DataSchema.class);
   private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S 
].*");
   private final String dataSource;
-  private final Map parser;
   private final AggregatorFactory[] aggregators;
   private final GranularitySpec granularitySpec;
   private final TransformSpec transformSpec;
+  private final Map parserMap;
+  private final ObjectMapper objectMapper;
 
-  private final ObjectMapper jsonMapper;
+  // The below fields can be initialized lazily from parser for backward 
compatibility.
+  private TimestampSpec timestampSpec;
+  private DimensionsSpec dimensionsSpec;
 
-  private InputRowParser cachedParser;
+  // This is used for backward compatibility
+  private InputRowParser inputRowParser;
 
   @JsonCreator
   public DataSchema(
   @JsonProperty("dataSource") String dataSource,
-  @JsonProperty("parser") Map parser,
+  @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // 
can be null in old task spec
+  @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, 
// can be null in old task spec
   @JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
   @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
   @JsonProperty("transformSpec") TransformSpec transformSpec,
-  @JacksonInject ObjectMapper jsonMapper
+  @Deprecated @JsonProperty("parser") @Nullable Map 
parserMap,
+  @JacksonInject ObjectMapper objectMapper
   )
   {
-this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null 
ObjectMapper.");
-this.parser = parser;
-this.transformSpec = transformSpec == null ? TransformSpec.NONE : 
transformSpec;
-
 validateDatasourceName(dataSource);
 this.dataSource = dataSource;
 
+this.timestampSpec = timestampSpec;
+this.dimensionsSpec = dimensionsSpec == null
+  ? null
+  : computeDimensionsSpec(
+  Preconditions.checkNotNull(timestampSpec, 
"timestampSpec"),
 
 Review comment:
   In the original code, it would allow a null `timestampSpec` and then just 
skip the logic to potentially add `timestampColumn` to `dimensionExclusions`. 
Why is `timestampSpec` required to not be null now instead of keeping the 
original behavior?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343921764
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/SplitSource.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SplitSource abstracts an {@link InputSplit} and knows how to read bytes 
from the given split.
+ */
+@ExtensionPoint
+public interface SplitSource
+{
+  Logger LOG = new Logger(SplitSource.class);
+
+  int DEFAULT_FETCH_BUFFER_SIZE = 4 * 1024; // 4 KB
+  int DEFAULT_MAX_FETCH_RETRY = 2; // 3 tries including the initial try
+
+  /**
+   * CleanableFile is the result type of {@link #fetch}.
+   * It should clean up any temporary resource on {@link #close()}.
+   */
+  interface CleanableFile extends Closeable
+  {
+File file();
+  }
+
+  InputSplit getSplit();
+
+  /**
+   * Opens an {@link InputStream} on the split directly.
+   * This is the basic way to read the given split.
+   *
+   * @see #fetch as an alternative way to read data.
+   */
+  InputStream open() throws IOException;
+
+  /**
+   * Fetches the split into the local storage.
+   * This method might be preferred instead of {@link #open()}, for example
+   *
+   * - {@link org.apache.druid.data.input.impl.InputFormat} requires expensive 
random access on remote storage.
+   * - Holding a connection until you consume the entire InputStream is 
expensive.
+   *
+   * @param temporaryDirectory to store temp data. This directory will be 
removed automatically once
+   *   the task finishes.
+   * @param fetchBufferis used to fetch remote split into local 
storage.
+   *
+   * @see FileUtils#copyLarge
+   */
+  default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) 
throws IOException
+  {
+final File tempFile = File.createTempFile("druid-split", ".tmp", 
temporaryDirectory);
+LOG.debug("Fetching split into file[%s]", tempFile.getAbsolutePath());
+FileUtils.copyLarge(
+open(),
 
 Review comment:
   This opens an `InputStream` that is not closed later. Since it only needs to 
be opened during the copy here, how about moving it into a try-with-resources?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343340884
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputFormat;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FirehoseFactoryToInputSourceAdaptorTest
+{
+  @Test
+  public void testUnimplementedInputFormat() throws IOException
+  {
+final List lines = new ArrayList<>();
+for (int i = 0; i < 10; i++) {
+  lines.add(StringUtils.format("%d,name_%d,%d", 20190101 + i, i, i + 100));
+}
+final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines);
+final StringInputRowParser inputRowParser = new StringInputRowParser(
+new TestCsvParseSpec(
+new TimestampSpec(null, "MMdd", null),
+new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", 
"name", "score"))),
+",",
+Arrays.asList("timestamp", "name", "score"),
+false,
+0
+),
+StringUtils.UTF8_STRING
+);
+final FirehoseFactoryToInputSourceAdaptor inputSourceAdaptor = new 
FirehoseFactoryToInputSourceAdaptor(
+firehoseFactory,
+inputRowParser
+);
+final InputSourceReader reader = inputSourceAdaptor.reader(
+inputRowParser.getParseSpec().getTimestampSpec(),
+inputRowParser.getParseSpec().getDimensionsSpec(),
+null,
+null
+);
+final List result = new ArrayList<>();
+try (CloseableIterator iterator = reader.read()) {
+  while (iterator.hasNext()) {
+result.add(iterator.next());
+  }
+}
+Assert.assertEquals(10, result.size());
+for (int i = 0; i < 10; i++) {
+  Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", 1 + 
i)), result.get(i).getTimestamp());
+  Assert.assertEquals(
+  StringUtils.format("name_%d", i),
+  Iterables.getOnlyElement(result.get(i).getDimension("name"))
+  );
+  Assert.assertEquals(
+  StringUtils.format("%d", i + 100),
+  Iterables.getOnlyElement(result.get(i).getDimension("score"))
+  );
+}
+  }
+
+  private static class TestCsvParseSpec extends CSVParseSpec
 
 Review comment:
   Suggestion: Rename the class to something like 
`UnimplementedInputFormatCsvParseSpec`. Currently, looking at just the body of 
`testUnimplementedInputFormat`, it's not apparent where the unimplemented input 
format is coming from.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343922643
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+public class JsonInputFormat extends NestedInputFormat
+{
+  private final Map featureSpec;
+  private final ObjectMapper objectMapper;
+
+  @JsonCreator
+  public JsonInputFormat(
+  @JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
 
 Review comment:
   This can be `@Nullable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r342782214
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FirehoseToInputSourceReaderAdaptor;
+import org.apache.druid.data.input.impl.InputFormat;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class FirehoseFactoryToInputSourceAdaptor implements 
SplittableInputSource
+{
+  private final FirehoseFactory firehoseFactory;
+  private final InputRowParser inputRowParser;
+
+  public FirehoseFactoryToInputSourceAdaptor(FirehoseFactory firehoseFactory, 
InputRowParser inputRowParser)
+  {
+this.firehoseFactory = firehoseFactory;
+this.inputRowParser = Preconditions.checkNotNull(inputRowParser, 
"inputRowParser");
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+return firehoseFactory.isSplittable();
+  }
+
+  @Override
+  public Stream createSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+  throws IOException
+  {
+if (firehoseFactory.isSplittable()) {
+  return ((FiniteFirehoseFactory) 
firehoseFactory).getSplits(splitHintSpec);
+} else {
+  throw new UnsupportedOperationException();
 
 Review comment:
   Is supporting unsplittable `Firehose`s future work?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343928317
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class LocalInputSource implements SplittableInputSource
+{
+  private final File baseDir;
+  private final String filter;
+
+  @JsonCreator
+  public LocalInputSource(
+  @JsonProperty("baseDir") File baseDir,
+  @JsonProperty("filter") String filter
+  )
+  {
+this.baseDir = baseDir;
+this.filter = filter;
+  }
+
+  @JsonProperty
+  public File getBaseDir()
+  {
+return baseDir;
+  }
+
+  @JsonProperty
+  public String getFilter()
+  {
+return filter;
+  }
+
+  @Override
+  public Stream> createSplits(InputFormat inputFormat, 
@Nullable SplitHintSpec splitHintSpec)
+  {
+return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(getFileIterator(), 
Spliterator.DISTINCT), false)
+.map(InputSplit::new);
+  }
+
+  @Override
+  public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec 
splitHintSpec)
+  {
+final Iterator fileIterator = getFileIterator();
+int num = 0;
+while (fileIterator.hasNext()) {
+  fileIterator.next();
+  num++;
+}
 
 Review comment:
   Can replace with 
[`Iterators#size`](https://guava.dev/releases/16.0/api/docs/com/google/common/collect/Iterators.html#size(java.util.Iterator))


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935145
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.FiniteFirehoseFactory;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class NoopFirehoseFactory implements FiniteFirehoseFactory
+{
+  @Override
+  public String toString()
+  {
+return "NoopFirehoseFactory{}";
+  }
+
+  @Override
 
 Review comment:
   Can add `@Nullable` and remove `throws IOException`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343419111
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/impl/FileSource.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitSource;
+import org.apache.druid.utils.CompressionUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+
+public class FileSource implements SplitSource
+{
+  private final InputSplit split;
+  private final FileChannel channel;
+
+  FileSource(InputSplit split) throws FileNotFoundException
+  {
+this.split = split;
+final RandomAccessFile file = new RandomAccessFile(split.get(), "r");
 
 Review comment:
   When does this opened file handle get closed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343942960
 
 

 ##
 File path: core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
 ##
 @@ -306,6 +306,33 @@ public void close()
 }
   }
 
+  public static long copyLarge(
 
 Review comment:
   Since this is almost the same as the method above, it'd be good to factor 
out the common parts to a private helper method. Also, since it's a library 
function used in several places, it would be good to add a unit test to 
`FileUtilsTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343881013
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.indexer.Checks;
+import org.apache.druid.indexer.Property;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class CsvInputFormat implements InputFormat
+{
+  private final String listDelimiter;
+  private final List columns;
+  private final boolean findColumnsFromHeader;
+  private final int skipHeaderRows;
+
+  @JsonCreator
+  public CsvInputFormat(
+  @JsonProperty("columns") @Nullable List columns,
+  @JsonProperty("listDelimiter") String listDelimiter,
+  @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow,
+  @JsonProperty("findColumnsFromHeader") @Nullable Boolean 
findColumnsFromHeader,
 
 Review comment:
   Does this need to be `@Nullable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343935199
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/NoopFirehoseFactory.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.FiniteFirehoseFactory;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class NoopFirehoseFactory implements FiniteFirehoseFactory
+{
+  @Override
+  public String toString()
+  {
+return "NoopFirehoseFactory{}";
+  }
+
+  @Override
+  public Stream getSplits(
+  @Nullable SplitHintSpec splitHintSpec
+  ) throws IOException
+  {
+return null;
+  }
+
+  @Override
+  public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws 
IOException
+  {
+return 0;
+  }
+
+  @Override
 
 Review comment:
   Can add `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343410250
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
 ##
 @@ -73,6 +74,13 @@ public void verify(List usedCols)
 return new JSONPathParser(getFlattenSpec(), objectMapper);
   }
 
+  @Nullable
 
 Review comment:
   Can remove `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343936612
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class JsonReaderTest
+{
+  @Test
+  public void testParseRow() throws IOException
+  {
+final JsonInputFormat format = new JsonInputFormat(
+new JSONPathSpec(
+true,
+ImmutableList.of(
+new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", 
"baz"),
+new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", 
"baz2"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", 
"$.o.mg"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", 
"$.o.mg2"),
+new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
+new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", 
".o.mg2")
+)
+),
+null
+);
+
+final ByteSource source = new ByteSource(
+
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}")
+);
+
+final SplitReader reader = format.createReader(
+new TimestampSpec("timestamp", "iso", null),
+new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")))
+);
+try (CloseableIterator iterator = reader.read(source, null)) {
+  while (iterator.hasNext()) {
+final InputRow row = iterator.next();
+Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
+Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("baz")));
+Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("root_baz")));
+Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("path_omg")));
+Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("jq_omg")));
+
+Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
+  }
+}
 
 Review comment:
   Also assert that the expected number of 1 iteration occurred


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343932828
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/SplitIteratingReader.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.data.input.SplitSource;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * InputSourceReader iterating multiple {@link SplitSource}s.
+ */
+public class SplitIteratingReader implements InputSourceReader
+{
+  private final TimestampSpec timestampSpec;
+  private final DimensionsSpec dimensionsSpec;
+  private final InputFormat inputFormat;
+  private final Iterator> sourceIterator;
+  private final File temporaryDirectory;
+
+  public SplitIteratingReader(
+  TimestampSpec timestampSpec,
+  DimensionsSpec dimensionsSpec,
+  InputFormat inputFormat,
+  Stream> sourceStream,
+  File temporaryDirectory
+  )
+  {
+this.timestampSpec = timestampSpec;
+this.dimensionsSpec = dimensionsSpec;
+this.inputFormat = inputFormat;
+this.sourceIterator = sourceStream.iterator();
+this.temporaryDirectory = temporaryDirectory;
+  }
+
+  @Override
+  public CloseableIterator read()
+  {
+return createIterator(reader -> {
+  try {
+return reader.read(sourceIterator.next(), temporaryDirectory);
+  }
+  catch (IOException e) {
+throw new RuntimeException(e);
+  }
+});
+  }
+
+  @Override
+  public CloseableIterator sample()
+  {
+return createIterator(reader -> {
+  try {
+return reader.sample(sourceIterator.next(), temporaryDirectory);
+  }
+  catch (IOException e) {
+throw new RuntimeException(e);
+  }
+});
+  }
+
+  private  CloseableIterator createIterator(Function> rowPopulator)
+  {
+return new CloseableIterator()
+{
+  CloseableIterator rowIterator = null;
+
+  @Override
+  public boolean hasNext()
+  {
+checkRowIterator();
+return rowIterator != null && rowIterator.hasNext();
+  }
+
+  @Override
+  public R next()
+  {
+if (!hasNext()) {
+  throw new NoSuchElementException();
+}
+return rowIterator.next();
+  }
+
+  private void checkRowIterator()
 
 Review comment:
   Consider renaming method to something that indicates `rowIterator` may be 
mutated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343948330
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 ##
 @@ -1015,6 +1044,16 @@ public IndexIngestionSpec(
 {
   super(dataSchema, ioConfig, tuningConfig);
 
+  Checks.checkOneNotNullOrEmpty(
+  ImmutableList.of(
+  new Property<>("parser", dataSchema.getParserMap()),
+  new Property<>("inputFormat", ioConfig.getInputFormat())
+  )
+  );
+  if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != 
null) {
+throw new IAE("Cannot use parser and inputSource together. Try use 
inputFormat instead of parser.");
 
 Review comment:
   Typo: Try use inputFormat -> Try using inputFormat


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343908366
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CsvReaderTest
+{
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", 
"auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+  DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))
+  );
+
+  @Test
+  public void testWithoutHeaders() throws IOException
+  {
+final ByteSource source = writeData(
+ImmutableList.of(
+"2019-01-01T00:00:10Z,name_1,5",
+"2019-01-01T00:00:20Z,name_2,10",
+"2019-01-01T00:00:30Z,name_3,15"
+)
+);
+final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", 
"name", "score"), null, false, 0);
+assertResult(source, format);
+  }
+
+  @Test
+  public void testFindColumn() throws IOException
+  {
+final ByteSource source = writeData(
+ImmutableList.of(
+"ts,name,score",
+"2019-01-01T00:00:10Z,name_1,5",
+"2019-01-01T00:00:20Z,name_2,10",
+"2019-01-01T00:00:30Z,name_3,15"
+)
+);
+final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, 
true, 0);
+assertResult(source, format);
+  }
+
+  @Test
+  public void testSkipHeaders() throws IOException
+  {
+final ByteSource source = writeData(
+ImmutableList.of(
+"this,is,a,row,to,skip",
+"2019-01-01T00:00:10Z,name_1,5",
+"2019-01-01T00:00:20Z,name_2,10",
+"2019-01-01T00:00:30Z,name_3,15"
+)
+);
+final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", 
"name", "score"), null, false, 1);
+assertResult(source, format);
+  }
+
+  @Test
+  public void testFindColumnAndSkipHeaders() throws IOException
 
 Review comment:
   This test is related to the behavior change comment in `CsvReader`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343425624
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+
+public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader
+{
+  private final FirehoseFactory firehoseFactory;
+  private final InputRowParser inputRowParser;
+  private final File temporaryDirectory;
+
+  public FirehoseToInputSourceReaderAdaptor(
+  FirehoseFactory firehoseFactory,
+  InputRowParser inputRowPlusRaw,
+  File temporaryDirectory
+  )
+  {
+this.firehoseFactory = firehoseFactory;
+this.inputRowParser = inputRowPlusRaw;
+this.temporaryDirectory = temporaryDirectory;
+  }
+
+  @Override
+  public CloseableIterator read() throws IOException
+  {
+return new CloseableIterator()
+{
+  final Firehose firehose = firehoseFactory.connect(inputRowParser, 
temporaryDirectory);
+
+  @Override
+  public boolean hasNext()
+  {
+try {
+  return firehose.hasMore();
+}
+catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public InputRow next()
+  {
+try {
+  return firehose.nextRow();
+}
+catch (IOException e) {
+  throw new RuntimeException(e);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343414360
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/SplitSource.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SplitSource abstracts an {@link InputSplit} and knows how to read bytes 
from the given split.
+ */
+@ExtensionPoint
+public interface SplitSource
+{
+  Logger LOG = new Logger(SplitSource.class);
+
+  int DEFAULT_FETCH_BUFFER_SIZE = 4 * 1024; // 4 KB
 
 Review comment:
   Unused?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343380177
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
 ##
 @@ -212,25 +247,78 @@ public TransformSpec getTransformSpec()
 return transformSpec;
   }
 
+  @Deprecated
+  @JsonProperty("parser")
+  @Nullable
+  @JsonInclude(Include.NON_NULL)
+  public Map getParserMap()
+  {
+return parserMap;
+  }
+
+  @Nullable
+  public InputRowParser getParser()
+  {
+if (inputRowParser == null) {
+  if (parserMap == null) {
+return null;
+  }
+  //noinspection unchecked
+  inputRowParser = 
transformSpec.decorate(objectMapper.convertValue(this.parserMap, 
InputRowParser.class));
+  ParseSpec parseSpec = inputRowParser.getParseSpec();
+  parseSpec = parseSpec.withDimensionsSpec(
+  computeDimensionsSpec(parseSpec.getTimestampSpec(), 
parseSpec.getDimensionsSpec(), aggregators)
+  );
+  if (timestampSpec != null) {
+parseSpec = parseSpec.withTimestampSpec(timestampSpec);
+  }
+  if (dimensionsSpec != null) {
+parseSpec = parseSpec.withDimensionsSpec(dimensionsSpec);
+  }
+  inputRowParser = inputRowParser.withParseSpec(parseSpec);
+}
+return inputRowParser;
+  }
+
   public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
   {
-return new DataSchema(dataSource, parser, aggregators, granularitySpec, 
transformSpec, jsonMapper);
+return new DataSchema(
+dataSource,
+timestampSpec,
+dimensionsSpec,
+aggregators,
+granularitySpec,
+transformSpec,
+parserMap,
+objectMapper
+);
   }
 
   public DataSchema withTransformSpec(TransformSpec transformSpec)
   {
-return new DataSchema(dataSource, parser, aggregators, granularitySpec, 
transformSpec, jsonMapper);
+return new DataSchema(
+dataSource,
+timestampSpec,
+dimensionsSpec,
+aggregators,
+granularitySpec,
+transformSpec,
+parserMap,
+objectMapper
+);
   }
 
   @Override
   public String toString()
   {
 return "DataSchema{" +
"dataSource='" + dataSource + '\'' +
-   ", parser=" + parser +
", aggregators=" + Arrays.toString(aggregators) +
", granularitySpec=" + granularitySpec +
", transformSpec=" + transformSpec +
+   (parserMap == null ? "" : ", parserMap=" + parserMap) +
 
 Review comment:
   Why not print `parserMap=null` if it's null?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343956499
 
 

 ##
 File path: 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java
 ##
 @@ -90,24 +91,29 @@
 @RunWith(Parameterized.class)
 public class MultiPhaseParallelIndexingTest extends 
AbstractParallelIndexSupervisorTaskTest
 {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
   public static Iterable constructorFeeder()
   {
 return ImmutableList.of(
-new Object[]{LockGranularity.TIME_CHUNK},
-new Object[]{LockGranularity.SEGMENT}
+new Object[]{LockGranularity.TIME_CHUNK, false},
+new Object[]{LockGranularity.TIME_CHUNK, true},
+new Object[]{LockGranularity.SEGMENT, false},
 
 Review comment:
   Similar comment to `IndexingTest` about skipping this permutation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343425515
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/FirehoseToInputSourceReaderAdaptor.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+
+public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader
+{
+  private final FirehoseFactory firehoseFactory;
+  private final InputRowParser inputRowParser;
+  private final File temporaryDirectory;
+
+  public FirehoseToInputSourceReaderAdaptor(
+  FirehoseFactory firehoseFactory,
+  InputRowParser inputRowPlusRaw,
+  File temporaryDirectory
+  )
+  {
+this.firehoseFactory = firehoseFactory;
+this.inputRowParser = inputRowPlusRaw;
+this.temporaryDirectory = temporaryDirectory;
+  }
+
+  @Override
+  public CloseableIterator read() throws IOException
+  {
+return new CloseableIterator()
+{
+  final Firehose firehose = firehoseFactory.connect(inputRowParser, 
temporaryDirectory);
+
+  @Override
+  public boolean hasNext()
+  {
+try {
+  return firehose.hasMore();
+}
+catch (IOException e) {
+  throw new RuntimeException(e);
 
 Review comment:
   Maybe use `UncheckedIOException` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343906675
 
 

 ##
 File path: core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.TextReader;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.collect.Utils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.common.parsers.ParserUtils;
+import org.apache.druid.java.util.common.parsers.Parsers;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class CsvReader extends TextReader
+{
+  private final CSVParser parser = new CSVParser();
+  private final boolean hasHeaderRow;
+  private final int skipHeaderRows;
+  private final Function multiValueFunction;
+  @Nullable
+  private List columns;
+
+  CsvReader(
+  TimestampSpec timestampSpec,
+  DimensionsSpec dimensionsSpec,
+  String listDelimiter,
+  @Nullable List columns,
+  boolean hasHeaderRow,
+  int skipHeaderRows
+  )
+  {
+super(timestampSpec, dimensionsSpec);
+this.hasHeaderRow = hasHeaderRow;
+this.skipHeaderRows = skipHeaderRows;
+final String finalListDelimeter = listDelimiter == null ? 
Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
+this.multiValueFunction = 
ParserUtils.getMultiValueFunction(finalListDelimeter, 
Splitter.on(finalListDelimeter));
+this.columns = hasHeaderRow ? null : columns; // columns will be overriden 
by header row
+
+if (this.columns != null) {
+  for (String column : this.columns) {
+Preconditions.checkArgument(!column.contains(","), "Column[%s] has a 
comma, it cannot", column);
+  }
+  verify(this.columns, dimensionsSpec.getDimensionNames());
+} else {
+  Preconditions.checkArgument(
+  hasHeaderRow,
+  "If columns field is not set, the first row of your data must have 
your header"
+  + " and hasHeaderRow must be set to true."
+  );
+}
+  }
+
+  @Override
+  public InputRow readLine(String line) throws IOException, ParseException
+  {
+final String[] parsed = parser.parseLine(line);
+final Map zipped = Utils.zipMapPartial(
+Preconditions.checkNotNull(columns, "columns"),
+Iterables.transform(Arrays.asList(parsed), multiValueFunction)
+);
+return MapInputRowParser.parse(getTimestampSpec(), getDimensionsSpec(), 
zipped);
+  }
+
+  @Override
+  public int getNumHeaderLines()
+  {
+return (hasHeaderRow ? 1 : 0) + skipHeaderRows;
 
 Review comment:
   Is this an intentional behavior change? From the current 
[docs](https://druid.apache.org/docs/latest/ingestion/data-formats.html):
   
   > Also, you can skip some header rows by setting skipHeaderRows in your 
parseSpec. If both skipHeaderRows and hasHeaderRow options are set, 
skipHeaderRows is first applied. For example, if you set skipHeaderRows to 2 
and hasHeaderRow to true, Druid will skip the first two lines and then extract 
column information from the third line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343937356
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class JsonReaderTest
+{
+  @Test
+  public void testParseRow() throws IOException
+  {
+final JsonInputFormat format = new JsonInputFormat(
+new JSONPathSpec(
+true,
+ImmutableList.of(
+new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", 
"baz"),
+new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", 
"baz2"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", 
"$.o.mg"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", 
"$.o.mg2"),
+new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
+new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", 
".o.mg2")
+)
+),
+null
+);
+
+final ByteSource source = new ByteSource(
+
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}")
+);
+
+final SplitReader reader = format.createReader(
+new TimestampSpec("timestamp", "iso", null),
+new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")))
+);
+try (CloseableIterator iterator = reader.read(source, null)) {
+  while (iterator.hasNext()) {
+final InputRow row = iterator.next();
+Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
+Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("baz")));
+Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("root_baz")));
+Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("path_omg")));
+Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("jq_omg")));
+
+Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
+  }
+}
+  }
+
+  @Test
+  public void testParseRowWithConditional() throws IOException
+  {
+final JsonInputFormat format = new JsonInputFormat(
+new JSONPathSpec(
+true,
+ImmutableList.of(
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "foo", 
"$.[?(@.maybe_object)].maybe_object.foo.test"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "baz", 
"$.maybe_object_2.foo.test"),
+new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar", 
"$.[?(@.something_else)].something_else.foo")
+)
+),
+null
+);
+
+final ByteSource source = new ByteSource(
+StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"something_else\": 
{\"foo\": \"test\"}}")
+);
+
+final SplitReader reader = format.createReader(
+new TimestampSpec("timestamp", "iso", null),
+new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")))
+);
+
+try (CloseableIterator iterator = reader.read(source, null)) {
+  while 

[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343924073
 
 

 ##
 File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.SplitReader;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+public class JsonInputFormat extends NestedInputFormat
+{
+  private final Map featureSpec;
+  private final ObjectMapper objectMapper;
+
+  @JsonCreator
+  public JsonInputFormat(
+  @JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
+  @JsonProperty("featureSpec") @Nullable Map featureSpec
 
 Review comment:
   When the docs are updated, I noticed a description of `featureSpec` is 
missing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343950067
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
 ##
 @@ -39,6 +44,18 @@ public ParallelIndexIngestionSpec(
   {
 super(dataSchema, ioConfig, tuningConfig);
 
+Checks.checkOneNotNullOrEmpty(
+ImmutableList.of(
+new Property<>("parser", dataSchema.getParserMap()),
+new Property<>("inputFormat", ioConfig.getInputFormat())
+)
+);
+if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != 
null) {
+  if (!(ioConfig.getInputSource() instanceof 
FirehoseFactoryToInputSourceAdaptor)) {
+throw new IAE("Cannot use parser and inputSource together. Try use 
inputFormat instead of parser.");
 
 Review comment:
   Typo: Try use inputFormat -> Try using inputFormat


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343956969
 
 

 ##
 File path: 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 ##
 @@ -67,21 +68,26 @@
 @RunWith(Parameterized.class)
 public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSupervisorTaskTest
 {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
   public static Iterable constructorFeeder()
   {
 return ImmutableList.of(
-new Object[]{LockGranularity.TIME_CHUNK},
-new Object[]{LockGranularity.SEGMENT}
+new Object[]{LockGranularity.TIME_CHUNK, false},
+new Object[]{LockGranularity.TIME_CHUNK, true},
+new Object[]{LockGranularity.SEGMENT, false},
 
 Review comment:
   Similar comment to `IndexingTest` about skipping this permutation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343948966
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 ##
 @@ -1043,30 +1082,90 @@ public IndexTuningConfig getTuningConfig()
   }
 
   @JsonTypeName("index")
-  public static class IndexIOConfig implements IOConfig
+  public static class IndexIOConfig implements BatchIOConfig
   {
 private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
 
 private final FirehoseFactory firehoseFactory;
+private final InputSource inputSource;
+private final InputFormat inputFormat;
 private final boolean appendToExisting;
 
 @JsonCreator
 public IndexIOConfig(
-@JsonProperty("firehose") FirehoseFactory firehoseFactory,
+@Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory 
firehoseFactory,
+@JsonProperty("inputSource") @Nullable InputSource inputSource,
+@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
 @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
 )
 {
+  Checks.checkOneNotNullOrEmpty(
+  ImmutableList.of(new Property<>("firehose", firehoseFactory), new 
Property<>("inputSource", inputSource))
+  );
+  if (firehoseFactory != null && inputFormat != null) {
+throw new IAE("Cannot use firehose and inputFormat together. Try use 
inputSource instead of firehose.");
 
 Review comment:
   Typo: Try use inputFormat -> Try using inputSource


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343940070
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class CsvInputFormatTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+final ObjectMapper mapper = new ObjectMapper();
+final CsvInputFormat format = new 
CsvInputFormat(Collections.singletonList("a"), "|", true, 10);
+final byte[] bytes = mapper.writeValueAsBytes(format);
+final CsvInputFormat fromJson = (CsvInputFormat) mapper.readValue(bytes, 
InputFormat.class);
+Assert.assertEquals(format, fromJson);
+  }
+
+  @Test
+  public void testColumnMissing()
+  {
+final CsvInputFormat format = new 
CsvInputFormat(Collections.singletonList("a"), ",", false, 0);
+expectedException.expect(IllegalArgumentException.class);
+expectedException.expectMessage("column[b] not in columns");
+format.createReader(
+new TimestampSpec("timestamp", "auto", null),
+new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("a", 
"b")))
+);
+  }
+
+  @Test
+  public void testComma()
 
 Review comment:
   I think it's better to test that it fails if one of the columns names 
defined in the `CsvInputFormat` has a comma (i.e., error message: `Column[%s] 
has a comma, it cannot`). Otherwise, it's testing the same behavior as 
`testColumnMissing()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343934796
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+public class NoopInputSource implements InputSource
+{
+  @Override
 
 Review comment:
   Can add `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343955244
 
 

 ##
 File path: 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 ##
 @@ -115,47 +118,49 @@
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new 
TimestampSpec("ts", "auto", null);
+  private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new 
DimensionsSpec(
+  DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))
+  );
   private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
-  new TimestampSpec(
-  "ts",
-  "auto",
-  null
-  ),
-  new DimensionsSpec(
-  DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
-  new ArrayList<>(),
-  new ArrayList<>()
-  ),
+  DEFAULT_TIMESTAMP_SPEC,
+  DEFAULT_DIMENSIONS_SPEC,
   null,
   Arrays.asList("ts", "dim", "val"),
   false,
   0
   );
+  private static final InputFormat DEFAULT_INPUT_FORMAT = 
DEFAULT_PARSE_SPEC.toInputFormat();
 
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
   public static Iterable constructorFeeder()
   {
 return ImmutableList.of(
-new Object[]{LockGranularity.TIME_CHUNK},
-new Object[]{LockGranularity.SEGMENT}
+new Object[]{LockGranularity.TIME_CHUNK, false},
+new Object[]{LockGranularity.TIME_CHUNK, true},
+new Object[]{LockGranularity.SEGMENT, false},
+new Object[]{LockGranularity.SEGMENT, true}
 
 Review comment:
   This is a relatively slow test (~15 seconds per parameterized run), so all 
the permutations may be overkill. Perhaps remove (SEGMENT, false), which will 
still give coverage of both lock granularities and both with/without the input 
format API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] ccaominh commented on a change in pull request #8823: Add InputSource and InputFormat interfaces

2019-11-07 Thread GitBox
ccaominh commented on a change in pull request #8823: Add InputSource and 
InputFormat interfaces
URL: https://github.com/apache/incubator-druid/pull/8823#discussion_r343934419
 
 

 ##
 File path: 
core/src/test/java/org/apache/druid/data/input/impl/SplitIteratingReaderTest.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SplitIteratingReaderTest
+{
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void test() throws IOException
+  {
+final List files = new ArrayList<>();
+for (int i = 0; i < 5; i++) {
+  final File file = temporaryFolder.newFile("test_" + i);
+  files.add(file);
+  try (Writer writer = Files.newBufferedWriter(file.toPath(), 
StandardCharsets.UTF_8)) {
+writer.write(StringUtils.format("%d,%s,%d\n", 20190101 + i, "name_" + 
i, i));
+writer.write(StringUtils.format("%d,%s,%d", 20190102 + i, "name_" + (i 
+ 1), i + 1));
+  }
+}
+final SplitIteratingReader firehose = new SplitIteratingReader<>(
+new TimestampSpec("time", "MMdd", null),
+new DimensionsSpec(
+DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", 
"score"))
+),
+new CsvInputFormat(
+ImmutableList.of("time", "name", "score"),
+null,
+false,
+0
+),
+files.stream().flatMap(file -> {
+  try {
+return ImmutableList.of(new FileSource(new 
InputSplit<>(file))).stream();
+  }
+  catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}),
+temporaryFolder.newFolder()
+);
+
+try (CloseableIterator iterator = firehose.read()) {
+  int i = 0;
+  while (iterator.hasNext()) {
+InputRow row = iterator.next();
+Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i 
+ 1)), row.getTimestamp());
+Assert.assertEquals(StringUtils.format("name_%d", i), 
Iterables.getOnlyElement(row.getDimension("name")));
+Assert.assertEquals(Integer.toString(i), 
Iterables.getOnlyElement(row.getDimension("score")));
+
+Assert.assertTrue(iterator.hasNext());
+row = iterator.next();
+Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i 
+ 2)), row.getTimestamp());
+Assert.assertEquals(StringUtils.format("name_%d", i + 1), 
Iterables.getOnlyElement(row.getDimension("name")));
+Assert.assertEquals(Integer.toString(i + 1), 
Iterables.getOnlyElement(row.getDimension("score")));
+i++;
+  }
+}
 
 Review comment:
   Can assert `i` is 5 here. Ideally, also add a named constant for the number 
of files that's used for both the setup and the assert.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org