[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1223


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183105915
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
+  case FOUR_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case TWO_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case NONE:
+if (innerValueIndex == -1) {
+  innerValueIndex = 0;
+}
+
+// Current record being processed in the incoming record batch. We 
could keep
+// track of it ourselves, but it is better to check with the 

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183105769
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.util.List;
+
+/**
+ * Placeholder for future unnest implementation that may require code 
generation. Current implementation does not
+ * require any
+ * @see UnnestImpl
+ */
+public interface Unnest {
+  //TemplateClassDefinition TEMPLATE_DEFINITION = new 
TemplateClassDefinition(Unnest.class, UnnestImpl
+  // .class);
+
+  void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, List transfers,
+  LateralContract lateral) throws SchemaChangeException;
+
+  int unnestRecords(int recordCount);
--- End diff --

Done


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183106563
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183105882
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
--- End diff --

It is not. Changed this and put a precondition check instead


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183107099
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183110851
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183106122
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183107693
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182873443
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183128175
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182873668
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183114381
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183116678
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182640359
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestingLateralJoinBatch.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * Create a derived class so we can access the protected ctor
+ */
+public class TestingLateralJoinBatch extends LateralJoinBatch{
--- End diff --

Not needed anymore constructor of LateralJoinBatch was made public.


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182628778
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
+  case FOUR_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case TWO_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case NONE:
+if (innerValueIndex == -1) {
+  innerValueIndex = 0;
+}
+
+// Current record being processed in the incoming record batch. We 
could keep
+// track of it ourselves, but it is better to check with the 

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182628483
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
--- End diff --

`OUTPUT_ROW_COUNT` is not required, you can directly assign it with 
`ValueVector.MAX_ROW_COUNT`


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182639747
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockLateralJoinPOP.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mock;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collections;
+import java.util.List;
+
+@JsonTypeName("mock-lj")
+public class MockLateralJoinPOP extends AbstractStore {
--- End diff --

Doesn't look like this class is used anywhere.


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183120789
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
+  case FOUR_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case TWO_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case NONE:
+if (innerValueIndex == -1) {
+  innerValueIndex = 0;
+}
+
+// Current record being processed in the incoming record batch. We 
could keep
+// track of it ourselves, but it is better to check with the 

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182628991
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
+  case FOUR_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case TWO_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case NONE:
+if (innerValueIndex == -1) {
+  innerValueIndex = 0;
+}
+
+// Current record being processed in the incoming record batch. We 
could keep
+// track of it ourselves, but it is better to check with the 

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r183117233
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
  

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182628559
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
--- End diff --

Little confused why `FlattenTemplate` is referenced here.


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-19 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182903822
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
+  case FOUR_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case TWO_BYTE:
+throw new UnsupportedOperationException("Unnest does not support 
selection vector inputs.");
+
+  case NONE:
+if (innerValueIndex == -1) {
+  innerValueIndex = 0;
+}
+
+// Current record being processed in the incoming record batch. We 
could keep
+// track of it ourselves, but it is better to check with the 

[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-19 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182904205
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Contains the actual unnest operation. Unnest is a simple transfer 
operation in this impelementation.
+ * For use as a table function, we will need to change the logic of the 
unnest method to operate on
+ * more than one row at a time and remove any dependence on Lateral
+ * {@link org.apache.drill.exec.physical.impl.flatten.FlattenTemplate}.
+ * This class follows the pattern of other operators that generate code at 
runtime. Normally this class
+ * would be abstract and have placeholders for doSetup and doEval. Unnest 
however, doesn't require code
+ * generation so we can simply implement the code in a simple class that 
looks similar to the code gen
+ * templates used by other operators but does not implement the doSetup 
and doEval methods.
+ */
+public class UnnestImpl implements Unnest {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnnestImpl.class);
+
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  private ImmutableList transfers;
+  private LateralContract lateral; // corresponding lateral Join (or other 
operator implementing the Lateral Contract)
+  private SelectionVectorMode svMode;
+  private RepeatedValueVector fieldToUnnest;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+
+  /**
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be 
decreased
+   * if records are found to be large.
+   */
+  private int outputLimit = OUTPUT_ROW_COUNT;
+
+
+  // The index in the unnest column that is being processed.We start at 
zero and continue until
+  // InnerValueCount is reached or  if the batch limit is reached
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int innerValueIndex = 0;
+
+  @Override
+  public void setUnnestField(RepeatedValueVector unnestField) {
+this.fieldToUnnest = unnestField;
+this.accessor = 
RepeatedValueVector.RepeatedAccessor.class.cast(unnestField.getAccessor());
+  }
+
+  @Override
+  public RepeatedValueVector getUnnestField() {
+return fieldToUnnest;
+  }
+
+  @Override
+  public void setOutputCount(int outputCount) {
+outputLimit = outputCount;
+  }
+
+  @Override
+  public final int unnestRecords(final int recordCount) {
+switch (svMode) {
--- End diff --

Is this additional check needed since setup() would have already checked 
svMode ?


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-19 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182902544
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.util.List;
+
+/**
+ * Placeholder for future unnest implementation that may require code 
generation. Current implementation does not
+ * require any
+ * @see UnnestImpl
+ */
+public interface Unnest {
+  //TemplateClassDefinition TEMPLATE_DEFINITION = new 
TemplateClassDefinition(Unnest.class, UnnestImpl
+  // .class);
+
+  void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, List transfers,
+  LateralContract lateral) throws SchemaChangeException;
+
+  int unnestRecords(int recordCount);
--- End diff --

Would be good to add a one line javadoc for these apis


---


[GitHub] drill pull request #1223: DRILL-6324: Unnest initial implementation

2018-04-19 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1223#discussion_r182908307
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
+public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+  private Unnest unnest;
+  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
+// to keep processing it. Kill may 
be called by a limit in a subquery that
+// requires us to stop processing 
thecurrent row, but not stop processing
+// the data.
+  // In some cases we need to return a predetermined state from a call to 
next. These are:
+  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
+  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
+  //between UNNEST and LATERAL. Iteroutcome should be EMIT
+  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
+  private IterOutcome nextState = OK;
+  private int remainderIndex = 0;
+  private int recordCount;
+  private MaterializedField unnestFieldMetadata;
+  private final UnnestMemoryManager memoryManager;
+
+  public enum Metric implements MetricDef {
+INPUT_BATCH_COUNT,
+AVG_INPUT_BATCH_BYTES,
+AVG_INPUT_ROW_BYTES,
+INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+

[GitHub] drill pull request #1223: Drill 6324: Unnest initial implementation

2018-04-18 Thread parthchandra
GitHub user parthchandra opened a pull request:

https://github.com/apache/drill/pull/1223

Drill 6324: Unnest initial implementation

Implementation of the unnest operator that works in sync with the Lateral 
Join operator. The code is based on the Flatten implementation except that it 
does not do the cross join that flatten does. As a result, the operator does 
not need any code generation.
Commit # 2 - adds specific handling for kill that may be as a result of a 
limit being reached by a downstream operator. 

@sohami, @amansinha100 please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/parthchandra/drill DRILL-6324

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1223


commit 7056e66b6e05f3a8f6cb9c4326d4b0a73cd87122
Author: Parth Chandra 
Date:   2018-02-08T05:13:13Z

DRILL-6324: Unnest - Initial Implementation

- Based on Flatten
- Implement unnestRecords in UnnestTemplate
- Remove unnecessary code inherited from Flatten/Project. Add schema change 
handling.
- Fix build failure after rebase since RecordBatchSizer used by UNNEST was 
relocated to a different package
- Add unit tests
- Handling of input row splitting across multiple batches. Also do not kill 
incoming in killIncoming.
- Schema change generated by Unnest

commit b5aaa143089da127396b2abc766cdb14b2843817
Author: Parth Chandra 
Date:   2018-02-26T15:58:31Z

DRILL-6324: Unnest - kill handling, remove codegen, and unit test for non 
array columns

commit 298f90654a487ab340582ce0cd5a0bf665536b9f
Author: Parth Chandra 
Date:   2018-03-07T08:23:14Z

DRILL-6324: Unnest - Add tests with real Unnest and real Lateral.

commit 8d9e02b4f11cda8458288c873bc2a94765569c43
Author: Parth Chandra 
Date:   2018-03-26T11:16:33Z

DRILL-6324: Unnest - code cleanup, more comments, fix license headers,
and more logging.

Refactor Unnest to allow setting in incoming batch after construction
fix compilation after rebase




---