http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 1b96f28..a38c3c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -32,8 +32,9 @@ import com.carrotsearch.hppc.cursors.IntDoubleCursor; import com.carrotsearch.hppc.cursors.IntLongCursor; import com.carrotsearch.hppc.procedures.IntDoubleProcedure; import com.carrotsearch.hppc.procedures.IntLongProcedure; +import com.google.common.annotations.VisibleForTesting; -public class OperatorStats implements OperatorStatReceiver { +public class OperatorStats { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class); protected final int operatorId; @@ -89,7 +90,8 @@ public class OperatorStats implements OperatorStatReceiver { } } - private OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { + @VisibleForTesting + public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { super(); this.allocator = allocator; this.operatorId = operatorId; @@ -169,7 +171,6 @@ public class OperatorStats implements OperatorStatReceiver { inProcessing = false; } - @Override public synchronized void startWait() { assert !inWait : assertionError("starting waiting"); stopProcessing(); @@ -177,7 +178,6 @@ public class OperatorStats implements OperatorStatReceiver { waitMark = System.nanoTime(); } - @Override public synchronized void stopWait() { assert inWait : assertionError("stopping waiting"); startProcessing(); @@ -203,7 +203,6 @@ public class OperatorStats implements OperatorStatReceiver { .toString(); } - public OperatorProfile getProfile() { final OperatorProfile.Builder b = OperatorProfile // .newBuilder() // @@ -213,14 +212,11 @@ public class OperatorStats implements OperatorStatReceiver { .setProcessNanos(processingNanos) .setWaitNanos(waitNanos); - if(allocator != null){ + if (allocator != null) { b.setPeakLocalMemoryAllocated(allocator.getPeakMemoryAllocation()); } - - addAllMetrics(b); - return b.build(); } @@ -249,7 +245,6 @@ public class OperatorStats implements OperatorStatReceiver { public void apply(int key, long value) { builder.addMetric(MetricValue.newBuilder().setMetricId(key).setLongValue(value)); } - } public void addLongMetrics(OperatorProfile.Builder builder) { @@ -278,22 +273,62 @@ public class OperatorStats implements OperatorStatReceiver { } } - @Override + /** + * Set a stat to the specified long value. Creates the stat + * if the stat does not yet exist. + * + * @param metric the metric to update + * @param value the value to set + */ + public void addLongStat(MetricDef metric, long value){ longMetrics.putOrAdd(metric.metricId(), value, value); } - @Override + @VisibleForTesting + public long getLongStat(MetricDef metric) { + return longMetrics.get(metric.metricId()); + } + + /** + * Add a double value to the existing value. Creates the stat + * (with an initial value of zero) if the stat does not yet + * exist. + * + * @param metric the metric to update + * @param value the value to add to the existing value + */ + public void addDoubleStat(MetricDef metric, double value){ doubleMetrics.putOrAdd(metric.metricId(), value, value); } - @Override + @VisibleForTesting + public double getDoubleStat(MetricDef metric) { + return doubleMetrics.get(metric.metricId()); + } + + /** + * Add a long value to the existing value. Creates the stat + * (with an initial value of zero) if the stat does not yet + * exist. + * + * @param metric the metric to update + * @param value the value to add to the existing value + */ + public void setLongStat(MetricDef metric, long value){ longMetrics.put(metric.metricId(), value); } - @Override + /** + * Set a stat to the specified double value. Creates the stat + * if the stat does not yet exist. + * + * @param metric the metric to update + * @param value the value to set + */ + public void setDoubleStat(MetricDef metric, double value){ doubleMetrics.put(metric.metricId(), value); } @@ -313,5 +348,4 @@ public class OperatorStats implements OperatorStatReceiver { public long getProcessingNanos() { return processingNanos; } - }
http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index d17c337..d42680a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -40,7 +40,7 @@ public interface GroupScan extends Scan, HasAffinity{ * 2) NULL is interpreted as ALL_COLUMNS. * How to handle skipAll query is up to each storage plugin, with different policy in corresponding RecordReader. */ - public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.getSimplePath("*")); + public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN); public static final long NO_COLUMN_STATS = -1; http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 0871621..b418fd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -75,7 +75,7 @@ public class ImplCreator { // to true. if (AssertionUtil.isAssertionsEnabled() || - context.getOptionSet().getOption(ExecConstants.ENABLE_ITERATOR_VALIDATOR) || + context.getOptions().getOption(ExecConstants.ENABLE_ITERATOR_VALIDATOR) || context.getConfig().getBoolean(ExecConstants.ENABLE_ITERATOR_VALIDATION)) { root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 77e9ea4..e0d1545 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -96,7 +96,7 @@ public class ScanBatch implements CloseableRecordBatch { this.readers = readerList.iterator(); this.implicitColumns = implicitColumnList.iterator(); if (!readers.hasNext()) { - throw UserException.systemError( + throw UserException.internalError( new ExecutionSetupException("A scan batch must contain at least one reader.")) .build(logger); } @@ -110,7 +110,7 @@ public class ScanBatch implements CloseableRecordBatch { if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) { Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, " + "or implicit columns are not same across all the record readers!"); - throw UserException.systemError(ex) + throw UserException.internalError(ex) .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName()) .build(logger); } @@ -210,11 +210,13 @@ public class ScanBatch implements CloseableRecordBatch { logger.error("Close failed for reader " + currentReaderClassName, e2); } } - throw UserException.systemError(e) + throw UserException.internalError(e) .addContext("Setup failed for", currentReaderClassName) .build(logger); + } catch (UserException ex) { + throw ex; } catch (Exception ex) { - throw UserException.systemError(ex).build(logger); + throw UserException.internalError(ex).build(logger); } finally { oContext.getStats().stopProcessing(); } @@ -254,7 +256,7 @@ public class ScanBatch implements CloseableRecordBatch { } } catch(SchemaChangeException e) { // No exception should be thrown here. - throw UserException.systemError(e) + throw UserException.internalError(e) .addContext("Failure while allocating implicit vectors") .build(logger); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 34c0f94..442a753 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) throws SchemaChangeException, ClassTransformationException, IOException { return createNewPriorityQueue( - mainMapping, leftMapping, rightMapping, context.getOptionSet(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(), + mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(), config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode()); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 3abf0fc..be0f61f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -300,7 +300,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { return false; } final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); - return expr.getPath().contains(StarColumnHelper.STAR_COLUMN); + return expr.getPath().contains(SchemaPath.WILDCARD); } private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException { @@ -542,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); final NameSegment ref = ex.getRef().getRootSegment(); final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); - final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); + final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD); if (refHasPrefix || exprContainsStar) { needed = true; @@ -596,10 +596,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final NameSegment ref = ex.getRef().getRootSegment(); final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); - final boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN); - final boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN); - final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); - final boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN); + final boolean exprIsStar = expr.getPath().equals(SchemaPath.WILDCARD); + final boolean refContainsStar = ref.getPath().contains(SchemaPath.WILDCARD); + final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD); + final boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.WILDCARD); String exprPrefix = EMPTY_STRING; String exprSuffix = expr.getPath(); http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index ac6a462..e75619e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -17,6 +17,11 @@ */ package org.apache.drill.exec.physical.impl.validate; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; + import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; @@ -30,11 +35,8 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.VectorValidator; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.*; - public class IteratorValidatorBatchIterator implements CloseableRecordBatch { private static final org.slf4j.Logger logger = http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java index 2288419..4199191 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -38,7 +38,7 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator> Preconditions.checkArgument(children.size() == 1); RecordBatch child = children.iterator().next(); IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child); - boolean validateBatches = context.getOptionSet().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) || + boolean validateBatches = context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) || context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION); iter.enableBatchValidation(validateBatches); logger.trace("Iterator validation enabled for " + child.getClass().getSimpleName() + http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 2054c9b..9150fe3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -486,7 +486,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public void close() { + + // Sanity check: if close is called twice, just ignore + // the second call. + + if (sortImpl == null) { + return; + } + RuntimeException ex = null; + + // If we got far enough to have a results iterator, close + // that first. + try { if (resultsIterator != null) { resultsIterator.close(); @@ -495,6 +507,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } catch (RuntimeException e) { ex = (ex == null) ? e : ex; } + + // Then close the "guts" of the sort operation. + try { if (sortImpl != null) { sortImpl.close(); @@ -506,14 +521,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // The call to super.close() clears out the output container. // Doing so requires the allocator here, so it must be closed - // after the super call. + // (when closing the operator context) after the super call. try { super.close(); } catch (RuntimeException e) { ex = (ex == null) ? e : ex; } - // Note: allocator is closed by the FragmentManager + + // Finally close the operator context (which closes the + // child allocator.) + + try { + oContext.close(); + } catch (RuntimeException e) { + ex = ex == null ? e : ex; + } if (ex != null) { throw ex; } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java index dee24dc..bca28f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java @@ -142,7 +142,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { } private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) { - CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); + CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java index 4d21b11..dda42a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java @@ -80,7 +80,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper { private PriorityQueueCopier newCopier(VectorAccessible batch) { // Generate the copier code and obtain the resulting class - CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); + CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions()); ClassGenerator<PriorityQueueCopier> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java index 2d53c3b..9fb478e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java @@ -36,6 +36,8 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import com.google.common.annotations.VisibleForTesting; + /** * Implementation of the external sort which is wrapped into the Drill * "next" protocol by the {@link ExternalSortBatch} class. @@ -105,7 +107,6 @@ public class SortImpl { public VectorContainer getContainer() { return dest; } } - /** * Return results for a single input batch. No merge is needed; * the original (sorted) input batch is simply passed as the result. @@ -200,7 +201,7 @@ public class SortImpl { allocator = opContext.getAllocator(); config = sortConfig; memManager = new SortMemoryManager(config, allocator.getLimit()); - metrics = new SortMetrics(opContext.getStatsWriter()); + metrics = new SortMetrics(opContext.getStats()); bufferedBatches = new BufferedBatches(opContext); // Request leniency from the allocator. Leniency @@ -215,6 +216,9 @@ public class SortImpl { logger.debug("Config: Is allocator lenient? {}", allowed); } + @VisibleForTesting + public OperatorContext opContext() { return context; } + public void setSchema(BatchSchema schema) { bufferedBatches.setSchema(schema); spilledRuns.setSchema(schema); @@ -541,6 +545,11 @@ public class SortImpl { } catch (RuntimeException e) { ex = ex == null ? e : ex; } + + // Note: don't close the operator context here. It must + // remain open until all containers are cleared, which + // is done in the ExternalSortBatch class. + if (ex != null) { throw ex; } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java index 8d20cca..ae436bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.physical.impl.xsort.managed; -import org.apache.drill.exec.ops.OperatorStatReceiver; +import org.apache.drill.exec.ops.OperatorStats; public class SortMetrics { @@ -38,12 +38,12 @@ public class SortMetrics { */ private long minimumBufferSpace; - private OperatorStatReceiver stats; + private OperatorStats stats; private int spillCount; private int mergeCount; private long writeBytes; - public SortMetrics(OperatorStatReceiver stats) { + public SortMetrics(OperatorStats stats) { assert stats != null; this.stats = stats; } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java index 1d43128..a9785ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java @@ -78,7 +78,7 @@ public class SorterWrapper extends BaseSortWrapper { private SingleBatchSorter newSorter(VectorAccessible batch) { CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get( - SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); + SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions()); ClassGenerator<SingleBatchSorter> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java index 672af42..87cbf86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java @@ -20,18 +20,16 @@ package org.apache.drill.exec.planner; import java.util.List; import java.util.Map; - import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; public class StarColumnHelper { public final static String PREFIX_DELIMITER = "\u00a6\u00a6"; - public final static String STAR_COLUMN = "**"; - - public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + STAR_COLUMN; + public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + SchemaPath.WILDCARD; public static boolean containsStarColumn(RelDataType type) { if (! type.isStruct()) { @@ -41,7 +39,7 @@ public class StarColumnHelper { List<String> fieldNames = type.getFieldNames(); for (String s : fieldNames) { - if (s.startsWith(STAR_COLUMN)) { + if (s.startsWith(SchemaPath.WILDCARD)) { return true; } } @@ -58,7 +56,7 @@ public class StarColumnHelper { if (expr instanceof RexInputRef) { String name = inputRowType.getFieldNames().get(((RexInputRef) expr).getIndex()); - if (name.startsWith(STAR_COLUMN)) { + if (name.startsWith(SchemaPath.WILDCARD)) { return true; } } @@ -72,7 +70,7 @@ public class StarColumnHelper { } public static boolean isNonPrefixedStarColumn(String fieldName) { - return fieldName.startsWith(STAR_COLUMN); + return fieldName.startsWith(SchemaPath.WILDCARD); } public static boolean isStarColumn(String fieldName) { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java index 7b52eda..0cc016b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java @@ -33,7 +33,6 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers; import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator; import org.apache.drill.exec.expr.holders.BigIntHolder; @@ -74,6 +73,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.TypeInferenceUtils; +import org.apache.drill.exec.vector.DateUtilities; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -315,7 +315,7 @@ public class DrillConstExecutor implements RexExecutor { milliseconds = intervalDayOut.milliseconds; } return rexBuilder.makeLiteral( - new BigDecimal(days * (long) DateUtility.daysToStandardMillis + milliseconds), + new BigDecimal(days * (long) DateUtilities.daysToStandardMillis + milliseconds), TypeInferenceUtils.createCalciteTypeWithNullability(typeFactory, SqlTypeName.INTERVAL_DAY, newCall.getType().isNullable()), false); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java index 1230498..37e4ca1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java @@ -26,8 +26,8 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexUtil; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.UnsupportedOperatorCollector; -import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.planner.sql.parser.DrillCalciteWrapperUtility; import org.apache.drill.exec.util.ApproximateStringMatcher; @@ -203,7 +203,7 @@ public class PreProcessLogicalRel extends RelShuttleImpl { public RelNode visit(LogicalUnion union) { for(RelNode child : union.getInputs()) { for(RelDataTypeField dataField : child.getRowType().getFieldList()) { - if(dataField.getName().contains(StarColumnHelper.STAR_COLUMN)) { + if(dataField.getName().contains(SchemaPath.WILDCARD)) { unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL, "Union-All over schema-less tables must specify the columns explicitly\n" + "See Apache Drill JIRA: DRILL-2414"); http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java index 394cde3..f323991 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java @@ -30,8 +30,8 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelConversionException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ProjectPrel; @@ -107,7 +107,7 @@ public class SplitUpComplexExpressions extends BasePrelVisitor<Prel, Object, Rel RexBuilder builder = new RexBuilder(factory); allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), index)); - if(fieldNames.get(index).contains(StarColumnHelper.STAR_COLUMN)) { + if(fieldNames.get(index).contains(SchemaPath.WILDCARD)) { relDataTypes.add(new RelDataTypeFieldImpl(fieldNames.get(index), allExprs.size(), factory.createSqlType(SqlTypeName.ANY))); } else { relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + exprIndex, allExprs.size(), factory.createSqlType(SqlTypeName.ANY))); http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java index 69458d4..c2227c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java @@ -30,7 +30,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.tools.RelConversionException; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.planner.StarColumnHelper; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.store.AbstractSchema; @@ -157,7 +157,7 @@ public class SqlHandlerUtil { .message("Partition column %s is not in the SELECT list of CTAS!", col) .build(logger); } else { - if (field.getName().startsWith(StarColumnHelper.STAR_COLUMN)) { + if (field.getName().startsWith(SchemaPath.WILDCARD)) { colRefStarNames.add(col); final List<RexNode> operands = Lists.newArrayList(); @@ -191,10 +191,12 @@ public class SqlHandlerUtil { final List<RexNode> refs = new AbstractList<RexNode>() { + @Override public int size() { return originalFieldSize + colRefStarExprs.size(); } + @Override public RexNode get(int index) { if (index < originalFieldSize) { return RexInputRef.of(index, inputRowType.getFieldList()); http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java index 377c7af..9037340 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,10 @@ public class ExpandableHyperContainer extends VectorContainer { public ExpandableHyperContainer(VectorAccessible batch) { super(); + build(batch); + } + + private void build(VectorAccessible batch) { if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { for (VectorWrapper<?> w : batch) { ValueVector[] hyperVector = w.getValueVectors(); @@ -42,17 +46,7 @@ public class ExpandableHyperContainer extends VectorContainer { public void addBatch(VectorAccessible batch) { if (wrappers.size() == 0) { - if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { - for (VectorWrapper<?> w : batch) { - ValueVector[] hyperVector = w.getValueVectors(); - this.add(hyperVector, true); - } - } else { - for (VectorWrapper<?> w : batch) { - ValueVector[] hyperVector = { w.getValueVector() }; - this.add(hyperVector, true); - } - } + build(batch); return; } if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 3e6bf64..f180b40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -61,6 +61,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp this.allocator = Preconditions.checkNotNull(allocator); } + public BufferAllocator allocator() { return allocator; } + /** * Load a record batch from a single buffer. * @@ -88,7 +90,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp // Set up to recognize previous fields that no longer exist. final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap(); - for(final VectorWrapper<?> wrapper : container) { + for (final VectorWrapper<?> wrapper : container) { final ValueVector vector = wrapper.getValueVector(); oldFields.put(vector.getField().getName(), vector); } @@ -97,7 +99,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp try { final List<SerializedField> fields = def.getFieldList(); int bufOffset = 0; - for(final SerializedField field : fields) { + for (final SerializedField field : fields) { final MaterializedField fieldDef = MaterializedField.create(field); ValueVector vector = oldFields.remove(fieldDef.getName()); @@ -105,7 +107,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp // Field did not exist previously--is schema change. schemaChanged = true; vector = TypeHelper.getNewVector(fieldDef, allocator); - } else if (!vector.getField().getType().equals(fieldDef.getType())) { + } else if (! vector.getField().getType().equals(fieldDef.getType())) { // Field had different type before--is schema change. // clear previous vector vector.clear(); @@ -125,7 +127,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp } // Load the vector. - if (field.getValueCount() == 0) { + if (buf == null) { + // Schema only + } else if (field.getValueCount() == 0) { AllocationHelper.allocate(vector, 0, 0, 0); } else { vector.load(field, buf.slice(bufOffset, field.getBufferLength())); @@ -151,9 +155,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp } throw cause; } finally { - if (!oldFields.isEmpty()) { + if (! oldFields.isEmpty()) { schemaChanged = true; - for (final ValueVector vector:oldFields.values()) { + for (final ValueVector vector : oldFields.values()) { vector.clear(); } } @@ -269,5 +273,4 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp container.clear(); resetRecordCount(); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java index e1a1031..67b2522 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java @@ -105,9 +105,6 @@ public class SchemaUtil { if (field.getType().getMinorType() == MinorType.UNION) { UnionVector u = (UnionVector) tp.getTo(); for (MinorType t : field.getType().getSubTypeList()) { - if (u.getField().getType().getSubTypeList().contains(t)) { - continue; - } u.addSubType(t); } } @@ -116,22 +113,7 @@ public class SchemaUtil { ValueVector newVector = TypeHelper.getNewVector(field, allocator); Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector"); UnionVector u = (UnionVector) newVector; - final ValueVector vv = u.addVector(tp.getTo()); - MinorType type = v.getField().getType().getMinorType(); - for (int i = 0; i < valueCount; i++) { - if (!vv.getAccessor().isNull(i)) { - u.getMutator().setType(i, type); - } else { - u.getMutator().setType(i, MinorType.LATE); - } - } - for (MinorType t : field.getType().getSubTypeList()) { - if (u.getField().getType().getSubTypeList().contains(t)) { - continue; - } - u.addSubType(t); - } - u.getMutator().setValueCount(valueCount); + u.setFirstType(tp.getTo(), valueCount); return u; } } else { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 9564f11..c46efaf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -18,9 +18,6 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -42,13 +39,14 @@ import com.google.common.collect.Sets; public class VectorContainer implements VectorAccessible { + private final BufferAllocator allocator; protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList(); private BatchSchema schema; private int recordCount = -1; - private BufferAllocator allocator; private boolean schemaChanged = true; // Schema has changed since last built. Must rebuild schema public VectorContainer() { + allocator = null; } public VectorContainer(OperatorContext oContext) { @@ -336,9 +334,13 @@ public class VectorContainer implements VectorAccessible { } public void clear() { - schema = null; zeroVectors(); + removeAll(); + } + + public void removeAll() { wrappers.clear(); + schema = null; } public void setRecordCount(int recordCount) { @@ -365,13 +367,17 @@ public class VectorContainer implements VectorAccessible { /** * Clears the contained vectors. (See {@link ValueVector#clear}). + * Note that the name <tt>zeroVector()</tt> in a value vector is + * used for the action to set all vectors to zero. Here it means + * to free the vector's memory. Sigh... */ + public void zeroVectors() { VectorAccessibleUtilities.clear(this); } public int getNumberOfColumns() { - return this.wrappers.size(); + return wrappers.size(); } public void allocateNew() { @@ -415,4 +421,30 @@ public class VectorContainer implements VectorAccessible { merged.schemaChanged = false; return merged; } + + /** + * Exchange buffers between two identical vector containers. + * The schemas must be identical in both column schemas and + * order. That is, after this call, data is exchanged between + * the containers. Requires that both containers be owned + * by the same allocator. + * + * @param other the target container with buffers to swap + */ + + public void exchange(VectorContainer other) { + assert schema.isEquivalent(other.schema); + assert wrappers.size() == other.wrappers.size(); + assert allocator != null && allocator == other.allocator; + for (int i = 0; i < wrappers.size(); i++) { + wrappers.get(i).getValueVector().exchange( + other.wrappers.get(i).getValueVector()); + } + int temp = recordCount; + recordCount = other.recordCount; + other.recordCount = temp; + boolean temp2 = schemaChanged; + schemaChanged = other.schemaChanged; + other.schemaChanged = temp2; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index b3b46c2..c806669 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -177,7 +177,7 @@ public class WritableBatch implements AutoCloseable { return b; } - public static WritableBatch get(RecordBatch batch) { + public static WritableBatch get(VectorAccessible batch) { if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable."); } @@ -198,5 +198,4 @@ public class WritableBatch implements AutoCloseable { drillBuf.release(1); } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 42f3473..7244148 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -86,11 +86,11 @@ public class SelectionVector2 implements AutoCloseable { } public void setBuffer(DrillBuf bufferHandle) { - /* clear the existing buffer */ - clear(); + /* clear the existing buffer */ + clear(); - this.buffer = bufferHandle; - buffer.retain(1); + this.buffer = bufferHandle; + buffer.retain(1); } public char getIndex(int index) { @@ -106,7 +106,7 @@ public class SelectionVector2 implements AutoCloseable { } public void setIndex(int index, int value) { - buffer.setChar(index, value); + buffer.setChar(index * RECORD_SIZE, value); } public boolean allocateNewSafe(int size) { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index bd077fb..b51fdca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,10 +20,10 @@ package org.apache.drill.exec.record.selection; import io.netty.buffer.ByteBuf; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.DeadBuf; public class SelectionVector4 implements AutoCloseable { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class); private ByteBuf data; private int recordCount; @@ -31,8 +31,9 @@ public class SelectionVector4 implements AutoCloseable { private int length; public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException { - if (recordCount > Integer.MAX_VALUE /4) { - throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. You requested an allocation of %d bytes.", recordCount * 4)); + if (recordCount > Integer.MAX_VALUE / 4) { + throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. " + + "You requested an allocation of %d bytes.", recordCount * 4L)); } this.recordCount = recordCount; this.start = 0; @@ -40,6 +41,17 @@ public class SelectionVector4 implements AutoCloseable { this.data = vector; } + public SelectionVector4(BufferAllocator allocator, int recordCount) { + if (recordCount > Integer.MAX_VALUE / 4) { + throw new IllegalStateException(String.format("Currently, Drill can only support allocations up to 2gb in size. " + + "You requested an allocation of %d bytes.", recordCount * 4L)); + } + this.recordCount = recordCount; + this.start = 0; + this.length = recordCount; + this.data = allocator.buffer(recordCount * 4); + } + public int getTotalCount() { return recordCount; } @@ -54,15 +66,15 @@ public class SelectionVector4 implements AutoCloseable { } public void set(int index, int compound) { - data.setInt(index*4, compound); + data.setInt(index * 4, compound); } public void set(int index, int recordBatch, int recordIndex) { - data.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535)); + data.setInt(index * 4, (recordBatch << 16) | (recordIndex & 65535)); } public int get(int index) { - return data.getInt( (start+index)*4); + return data.getInt((start+index) * 4); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index 4b71b0f..f9d44cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -17,24 +17,25 @@ */ package org.apache.drill.exec.store; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.util.Utilities; import org.apache.hadoop.fs.Path; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; public class ColumnExplorer { @@ -46,13 +47,12 @@ public class ColumnExplorer { private final Map<String, ImplicitFileColumns> allImplicitColumns; private final Map<String, ImplicitFileColumns> selectedImplicitColumns; - /** * Helper class that encapsulates logic for sorting out columns * between actual table columns, partition columns and implicit file columns. * Also populates map with implicit columns names as keys and their values */ - public ColumnExplorer(FragmentContext context, List<SchemaPath> columns) { + public ColumnExplorer(FragmentContextInterface context, List<SchemaPath> columns) { this(context.getOptions(), columns); } @@ -62,7 +62,7 @@ public class ColumnExplorer { * Also populates map with implicit columns names as keys and their values */ public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) { - this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); this.columns = columns; this.isStarQuery = columns != null && Utilities.isStarQuery(columns); this.selectedPartitionColumns = Lists.newArrayList(); @@ -74,7 +74,8 @@ public class ColumnExplorer { } /** - * Creates case insensitive map with implicit file columns as keys and appropriate ImplicitFileColumns enum as values + * Creates case insensitive map with implicit file columns as keys and + * appropriate ImplicitFileColumns enum as values */ public static Map<String, ImplicitFileColumns> initImplicitFileColumns(OptionManager optionManager) { Map<String, ImplicitFileColumns> map = CaseInsensitiveMap.newHashMap(); @@ -94,8 +95,8 @@ public class ColumnExplorer { * @param column column * @return true if given column is partition, false otherwise */ - public static boolean isPartitionColumn(OptionManager optionManager, SchemaPath column){ - String partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + public static boolean isPartitionColumn(OptionManager optionManager, SchemaPath column) { + String partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); String path = column.getRootSegmentPath(); return isPartitionColumn(partitionDesignator, path); } @@ -252,11 +253,11 @@ public class ColumnExplorer { this.name = name; } + public String optionName() { return name; } + /** * Using file path calculates value for each implicit file column */ public abstract String getValue(Path path); - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java index 98e460a..1aa278a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java @@ -96,5 +96,4 @@ public class ResourceInputStream extends ByteArrayInputStream implements Seekabl throw new EOFException(); } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java index 489e03c..e97316c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.dfs; -import org.apache.drill.exec.ops.OperatorStatReceiver; import org.apache.drill.exec.ops.OperatorStats; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.fs.FSDataInputStream; @@ -39,13 +38,14 @@ import java.util.EnumSet; public class DrillFSDataInputStream extends FSDataInputStream { private final FSDataInputStream underlyingIs; private final OpenFileTracker openFileTracker; - private final OperatorStatReceiver operatorStats; + private final OperatorStats operatorStats; - public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats) throws IOException { + public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException { this(in, operatorStats, null); } - public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats, + @SuppressWarnings("resource") + public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats, OpenFileTracker openFileTracker) throws IOException { super(new WrappedInputStream(in, operatorStats)); underlyingIs = in; @@ -194,9 +194,9 @@ public class DrillFSDataInputStream extends FSDataInputStream { */ private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable { final FSDataInputStream is; - final OperatorStatReceiver operatorStats; + final OperatorStats operatorStats; - WrappedInputStream(FSDataInputStream is, OperatorStatReceiver operatorStats) { + WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) { this.is = is; this.operatorStats = operatorStats; } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java index fc540aa..52e1a96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import org.apache.drill.exec.ops.OperatorStatReceiver; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.util.AssertionUtil; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -80,14 +80,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker { private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap(); private final FileSystem underlyingFs; - private final OperatorStatReceiver operatorStats; + private final OperatorStats operatorStats; private final CompressionCodecFactory codecFactory; public DrillFileSystem(Configuration fsConf) throws IOException { this(fsConf, null); } - public DrillFileSystem(Configuration fsConf, OperatorStatReceiver operatorStats) throws IOException { + public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException { this.underlyingFs = FileSystem.get(fsConf); this.codecFactory = new CompressionCodecFactory(fsConf); this.operatorStats = operatorStats; http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java index 80bcef2..587201e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,11 +17,8 @@ */ package org.apache.drill.exec.store.dfs.easy; - public interface FileWork { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileWork.class); - - public String getPath(); - public long getStart(); - public long getLength(); + String getPath(); + long getStart(); + long getLength(); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java index 8910c26..ef8f861 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java @@ -24,6 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.drill.common.exceptions.UserException; + import com.google.common.base.Charsets; /** @@ -67,23 +69,6 @@ public class HeaderBuilder extends TextOutput { public static final String ANONYMOUS_COLUMN_PREFIX = "column_"; - /** - * Exception that reports header errors. Is an unchecked exception - * to avoid cluttering the normal field reader interface. - */ - public static class HeaderError extends RuntimeException { - - private static final long serialVersionUID = 1L; - - public HeaderError(String msg) { - super(msg); - } - - public HeaderError(int colIndex, String msg) { - super("Column " + (colIndex + 1) + ": " + msg); - } - } - public final List<String> headers = new ArrayList<>(); public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); @@ -204,14 +189,18 @@ public class HeaderBuilder extends TextOutput { try { currentField.put(data); } catch (BufferOverflowException e) { - throw new HeaderError(headers.size(), "Column exceeds maximum length of " + MAX_HEADER_LEN); + throw UserException.dataReadError() + .message("Column exceeds maximum length of %d", MAX_HEADER_LEN) + .build(logger); } } @Override public void finishRecord() { if (headers.isEmpty()) { - throw new HeaderError("The file must define at least one header."); + throw UserException.dataReadError() + .message("The file must define at least one header.") + .build(logger); } // Force headers to be unique. http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java index d218846..7a7ad0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java @@ -372,15 +372,18 @@ final class TextReader { throw new TextParsingException(context, "Cannot use newline character within quoted string"); } - if(success){ + if (success) { if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) { context.stop(); } return true; - }else{ + } else { return false; } + } catch (UserException ex) { + stopParsing(); + throw ex; } catch (StreamFinishedPseudoException ex) { stopParsing(); return false; http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index eadbeb0..a611c6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -34,7 +34,6 @@ import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.util.Utilities; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -431,7 +430,7 @@ public class Metadata { List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList(); ArrayList<SchemaPath> ALL_COLS = new ArrayList<>(); - ALL_COLS.add(Utilities.STAR_COLUMN); + ALL_COLS.add(SchemaPath.STAR_COLUMN); boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected(); ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 773f3d3..3935919 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -34,7 +34,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; -import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; @@ -226,7 +225,7 @@ public class ParquetSchema { for (int i = 0; i < columnsFound.length; i++) { SchemaPath col = projectedColumns.get(i); assert col != null; - if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) { + if ( ! columnsFound[i] && ! col.equals(SchemaPath.STAR_COLUMN)) { nullFilledVectors.add(createMissingColumn(col, output)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java index 35358c2..9125e96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java @@ -31,7 +31,6 @@ import java.util.Collection; public class Utilities { - public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*"); public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields."; public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) { @@ -87,7 +86,7 @@ public class Utilities { return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() { @Override public boolean apply(SchemaPath path) { - return Preconditions.checkNotNull(path).equals(STAR_COLUMN); + return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN); } }).isPresent(); } http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java index 2611b86..d85d75b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +19,8 @@ package org.apache.drill.exec.vector.accessor.sql; import java.sql.Time; -import org.apache.drill.exec.expr.fn.impl.DateUtility; +@SuppressWarnings("serial") public class TimePrintMillis extends Time { private static final String[] leadingZeroes = {"", "0", "00"}; @@ -33,7 +33,7 @@ public class TimePrintMillis extends Time { @Override public String toString () { - int millis = (int) (getTime() % DateUtility.secondsToMillis); + int millis = (int) (getTime() % org.apache.drill.exec.vector.DateUtilities.secondsToMillis); StringBuilder time = new StringBuilder().append(super.toString()); if (millis > 0) { http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java index bf1448e..fec9e66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +20,6 @@ package org.apache.drill.exec.vector.complex.fn; import java.io.IOException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.DateHolder; @@ -30,6 +29,7 @@ import org.apache.drill.exec.expr.holders.TimeHolder; import org.apache.drill.exec.expr.holders.TimeStampHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.vector.DateUtilities; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; import org.apache.drill.exec.vector.complex.writer.BigIntWriter; @@ -258,9 +258,9 @@ abstract class VectorOutput { IntervalWriter intervalWriter = writer.interval(); if(!isNull){ final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString()); - int months = DateUtility.monthsFromPeriod(p); + int months = DateUtilities.monthsFromPeriod(p); int days = p.getDays(); - int millis = DateUtility.millisFromPeriod(p); + int millis = DateUtilities.periodToMillis(p); intervalWriter.writeInterval(months, days, millis); } } @@ -295,6 +295,7 @@ abstract class VectorOutput { return innerRun(); } + @SuppressWarnings("resource") @Override public void writeBinary(boolean isNull) throws IOException { VarBinaryWriter bin = writer.varBinary(fieldName); @@ -326,6 +327,7 @@ abstract class VectorOutput { @Override public void writeTime(boolean isNull) throws IOException { + @SuppressWarnings("resource") TimeWriter t = writer.time(fieldName); if(!isNull){ DateTimeFormatter f = ISODateTimeFormat.time(); @@ -333,6 +335,7 @@ abstract class VectorOutput { } } + @SuppressWarnings("resource") @Override public void writeTimestamp(boolean isNull) throws IOException { TimeStampWriter ts = writer.timeStamp(fieldName); @@ -359,15 +362,16 @@ abstract class VectorOutput { IntervalWriter intervalWriter = writer.interval(fieldName); if(!isNull){ final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString()); - int months = DateUtility.monthsFromPeriod(p); + int months = DateUtilities.monthsFromPeriod(p); int days = p.getDays(); - int millis = DateUtility.millisFromPeriod(p); + int millis = DateUtilities.periodToMillis(p); intervalWriter.writeInterval(months, days, millis); } } @Override public void writeInteger(boolean isNull) throws IOException { + @SuppressWarnings("resource") BigIntWriter intWriter = writer.bigInt(fieldName); if(!isNull){ intWriter.writeBigInt(Long.parseLong(parser.getValueAsString())); http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java index 34c8c6c..22cd618 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java @@ -255,8 +255,11 @@ public class TestStarQueries extends BaseTestQuery { public void testStarView1() throws Exception { test("use dfs.tmp"); test("create view vt1 as select * from cp.`tpch/region.parquet` r, cp.`tpch/nation.parquet` n where r.r_regionkey = n.n_regionkey"); - test("select * from vt1"); - test("drop view vt1"); + try { + test("select * from vt1"); + } finally { + test("drop view vt1"); + } } @Test // select star for a SchemaTable. @@ -271,9 +274,12 @@ public class TestStarQueries extends BaseTestQuery { "join (select * from cp.`tpch/nation.parquet`) t2 " + "on t1.name = t2.n_name"; - test("alter session set `planner.enable_broadcast_join` = false"); - test(query); - test("alter session set `planner.enable_broadcast_join` = true"); + try { + alterSession("planner.enable_broadcast_join", false); + test(query); + } finally { + resetSessionOption("planner.enable_broadcast_join"); + } test(query); }