[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-03-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1121


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169203505
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
--- End diff --

Fixed.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169203884
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class VectorContainerAccessor implements BatchAccessor {
+
+  public static class ContainerAndSv2Accessor extends 
VectorContainerAccessor {
+
+private SelectionVector2 sv2;
+
+public void setSelectionVector(SelectionVector2 sv2) {
+  this.sv2 = sv2;
+}
+
+@Override
+public SelectionVector2 getSelectionVector2() {
+  return sv2;
+}
+  }
+
+  public static class ContainerAndSv4Accessor extends 
VectorContainerAccessor {
+
+private SelectionVector4 sv4;
+
+@Override
+public SelectionVector4 getSelectionVector4() {
+  return sv4;
+}
+  }
+
+  private VectorContainer container;
+  private SchemaTracker schemaTracker = new SchemaTracker();
+
+  /**
+   * Set the vector container. Done initially, and any time the schema of
+   * the container may have changed. May be called with the same container
+   * as the previous call, or a different one. A schema change occurs
+   * unless the vectors are identical across the two containers.
+   *
+   * @param container the container that holds vectors to be sent
+   * downstream
+   */
+
+  public void setContainer(VectorContainer container) {
+this.container = container;
+if (container != null) {
+  schemaTracker.trackSchema(container);
+}
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+return container == null ? null : container.getSchema();
+  }
+
+  @Override
+  public int schemaVersion() { return schemaTracker.schemaVersion(); }
+
+  @Override
+  public int getRowCount() {
+return container == null ? 0 : container.getRecordCount();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() { return container; }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper getValueAccessorById(Class clazz, int... ids) 
{
+return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+return WritableBatch.get(container);
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+// Throws an exception by default
--- End diff --

We could. The reason that it throws an exception is that, if the batch has 
no selection vector, yet we ask for it, it is an error under Drill semantics. 
Said another way, the client should ask for a selection vector only if one is 
available. This is existing behavior; this method simply wraps that existing 
behavior.

So, the place to document the exception is on the `VectorContainer` method.

Improved the comment a bit.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169203498
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+this.opContext = opContext;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
--- End diff --

Fixed.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169203509
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+this.opContext = opContext;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
--- End diff --

Added a Cancelled state. But, nothing ever reads that state. The point of 
FAILED is just to avoid confusion when calling `next()` after a failure or 
cancellation.

We definitely *do not* want cancellation to move to the `CLOSED` state. 
This is a bug that exists in several operators. If `cancel()` closes the 
operator, then the operator is closed twice: once when the downstream operator 
says it wants no more rows, and a second time when the fragment 

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169155001
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
--- End diff --

minor: closing braces in a separate line for better readability.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169159418
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+this.opContext = opContext;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
--- End diff --

I am thinking FAILED represents internal failure with in the operator.  
Cancel means we are explicitly canceling it (for whatever reasons) i.e. 
operator is being asked to shutdown or close.  For cancel, should we move the 
state to CLOSED instead of FAILED. 


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169155210
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+/**
+ * Before the first call to next().
+ */
+
+START,
+
+/**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+SCHEMA,
+
+/**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+RUN,
+
+/**
+ * No more data to deliver.
+ */
+
+END,
+
+/**
+ * An error occurred. Operation was cancelled.
+ */
+
+FAILED,
+
+/**
+ * close() called and resources are released.
+ */
+
+CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+this.opContext = opContext;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
--- End diff --

alignment


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-19 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r169162527
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class VectorContainerAccessor implements BatchAccessor {
+
+  public static class ContainerAndSv2Accessor extends 
VectorContainerAccessor {
+
+private SelectionVector2 sv2;
+
+public void setSelectionVector(SelectionVector2 sv2) {
+  this.sv2 = sv2;
+}
+
+@Override
+public SelectionVector2 getSelectionVector2() {
+  return sv2;
+}
+  }
+
+  public static class ContainerAndSv4Accessor extends 
VectorContainerAccessor {
+
+private SelectionVector4 sv4;
+
+@Override
+public SelectionVector4 getSelectionVector4() {
+  return sv4;
+}
+  }
+
+  private VectorContainer container;
+  private SchemaTracker schemaTracker = new SchemaTracker();
+
+  /**
+   * Set the vector container. Done initially, and any time the schema of
+   * the container may have changed. May be called with the same container
+   * as the previous call, or a different one. A schema change occurs
+   * unless the vectors are identical across the two containers.
+   *
+   * @param container the container that holds vectors to be sent
+   * downstream
+   */
+
+  public void setContainer(VectorContainer container) {
+this.container = container;
+if (container != null) {
+  schemaTracker.trackSchema(container);
+}
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+return container == null ? null : container.getSchema();
+  }
+
+  @Override
+  public int schemaVersion() { return schemaTracker.schemaVersion(); }
+
+  @Override
+  public int getRowCount() {
+return container == null ? 0 : container.getRecordCount();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() { return container; }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper getValueAccessorById(Class clazz, int... ids) 
{
+return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+return WritableBatch.get(container);
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+// Throws an exception by default
--- End diff --

should we make that explicit by indicating what exceptions it might throw. 


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168677109
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.exec.ops.OperatorContext;
+
+/**
+ * Core protocol for a Drill operator execution.
+ *
+ * Lifecycle
+ *
+ * 
+ * Creation via an operator-specific constructor in the
+ * corresponding RecordBatchCreator.
+ * bind() called to provide the operator services.
+ * buildSchema() called to define the schema before
+ * fetching the first record batch.
+ * next() called repeatedly to prepare each new record
+ * batch until EOF or until cancellation.
+ * cancel() called if the operator should quit early.
+ * close() called to release resources. Note that
+ * close() is called in response to:
+ *   EOF
+ *   After cancel()
+ *   After an exception is thrown.
+ * 
+ *
+ * Error Handling
+ *
+ * Any method can throw an (unchecked) exception. (Drill does not use
+ * checked exceptions.) Preferably, the code will throw a
+ * UserException that explains the error to the user. If any
+ * other kind of exception is thrown, then the enclosing class wraps it
+ * in a generic UserException that indicates that "something went
+ * wrong", which is less than ideal.
+ *
+ * Result Set
+ * The operator "publishes" a result set in response to returning
+ * true from next() by populating a
+ * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
+ * compatibility with other Drill operators, the set of vectors within
+ * the batch must be the same from one batch to the next.
+ */
+
+public interface OperatorExec {
+
+  /**
+   * Bind this operator to the context. The context provides access
+   * to per-operator, per-fragment and per-Drillbit services.
+   * Also provides access to the operator definition (AKA "pop
+   * config") for this operator.
+   *
+   * @param context operator context
+   */
+
+  public void bind(OperatorContext context);
+
+  /**
+   * Provides a generic access mechanism to the batch's output data.
+   * This method is called after a successful return from
+   * {@link #buildSchema()} and {@link #next()}. The batch itself
+   * can be held in a standard {@link VectorContainer}, or in some
+   * other structure more convenient for this operator.
+   *
+   * @return the access for the batch's output container
+   */
+
+  BatchAccessor batchAccessor();
+
+  /**
+   * Retrieves the schema of the batch before the first actual batch
+   * of data. The schema is returned via an empty batch (no rows,
+   * only schema) from {@link #batchAccessor()}.
+   *
+   * @return true if a schema is available, false if the operator
+   * reached EOF before a schema was found
+   */
+
+  boolean buildSchema();
+
+  /**
+   * Retrieves the next batch of data. The data is returned via
+   * the {@link #batchAccessor()} method.
+   *
+   * @return true if another batch of data is available, false if
+   * EOF was reached and no more data is available
+   */
+
+  boolean next();
+
+  /**
+   * Alerts the operator that the query was cancelled. Generally
+   * optional, but allows the operator to realize that a cancellation
--- End diff --

An operator will work if it ignores `cancel()`. It's `close()` method 
should "do the right thing":

* If the operator reached EOF
* If the operator did not reach EOF (still rows left)
* If the operator failed

The `cancel()` call is just a hint that "I won't be reading any more rows; 
feel free to release resources now if you like."


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168676813
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
+this.opContext = opServicees;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
+}
+  }
+
+ /**
+   * Start the operator executor. Bind it to the various contexts.
+   * Then start the executor and fetch the first schema.
+   * @return result of the first batch, which should contain
+   * only a schema, or EOF
+   */
+
+  private IterOutcome start() {
--- End diff --

Somewhere I explained the policy assumed here:

* Implementations of the `OperatorExec` interface are responsible for error 
handling.
* Implementations should catch exceptions then translate them to a 
`UserException` with user-oriented information.
* If implementations do the above, this layer simply passes along the 
`UserException`.
* Otherwise, this class acts as a translation layer to convert generic 
unchecked exceptions into `UserException`s. This layer can't provide much 
context; but it can do a slightly better job than the fragment exec, which is 
the last line of defense.

As a result, this layer always throws `UserException`s.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168675063
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
--- End diff --

Next, the question about the `END` and `FAILED` states. `END` means we 
reached EOF and have no data left to deliver. Calling `next()` in the `END` 
state simply returns `DONE` (there is *still* no more data.)

On the other hand, `FAILED` indicates that an error occurred. This seemed 
like a useful thing to know. But, as I review this version of the code, I see 
we are not actually using this state. So, rather than add a `CANCELLED` state, 
maybe we can collapse `FAILED` into `END`: which is just a signal to `next()` 
to return `DONE`.

Cleanup should be no different in the three cases: we must release all 
resources regardless of the reason that `close()` is called.

Thoughts?


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168676906
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
+this.opContext = opServicees;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
+}
+  }
+
+ /**
+   * Start the operator executor. Bind it to the various contexts.
+   * Then start the executor and fetch the first schema.
+   * @return result of the first batch, which should contain
+   * only a schema, or EOF
+   */
+
+  private IterOutcome start() {
+state = State.SCHEMA;
+if (operatorExec.buildSchema()) {
+  schemaVersion = batchAccessor.schemaVersion();
+  state = State.RUN;
+  return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  state = State.END;
+  return IterOutcome.NONE;
+}
+  }
+
+  /**
+   * Fetch a record batch, detecting EOF and a new schema.
+   * @return the IterOutcome for the above cases
+   */
+
+  private IterOutcome doNext() {
+if (! operatorExec.next()) {
+  state = State.END;
+  return IterOutcome.NONE;
+}
+int newVersion = batchAccessor.schemaVersion();
+if (newVersion != schemaVersion) {
+  schemaVersion = newVersion;
+  return IterOutcome.OK_NEW_SCHEMA;
+}
+return IterOutcome.OK;
+  }
+
+  /**
+   * Implement a cancellation, and ignore any exception that is
+   * thrown. We're 

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168674650
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
--- End diff --

Thanks for the questions. Let's take them one-by-one.

The model here is that an operator follows the "fast schema" pattern:

* The first call to `next()` produces an empty batch with only the schema.
* The second call to `next()` returns the first data batch.

The states help:

* `START`: The stage in which the operator has been created, but before the 
first call to `next()`. When `next()` is called in the `START` state, return 
just the schema.
* `SCHEMA`: The schema only has been returned. On the next call to `next()` 
return the data (if any) associated with the first batch.
* `RUN`: Normal state for the second and subsequent `next()` calls.

Now, do we need "fast schema"? Maybe not. I *thought* that Drill was 
designed to return the schema quickly to the client before waiting for the 
first data batch. But, in subsequent testing, I discovered that few queries 
actually worked that way. (Some tests count the returned batches and asserted 
that there should have been only 1: with both data and schema...)

So, if we want "fast schema" then we need the three states. But, if we want 
the original behavior, then we can, in fact, remove the `SCHEMA` state.

Was there a reason for the "fast schema" path? Or, was that just a vestige 
of a never-completed feature?


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-15 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168676504
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
--- End diff --

Fixed.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168324742
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
+this.opContext = opServicees;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
+}
+  }
+
+ /**
+   * Start the operator executor. Bind it to the various contexts.
+   * Then start the executor and fetch the first schema.
+   * @return result of the first batch, which should contain
+   * only a schema, or EOF
+   */
+
+  private IterOutcome start() {
+state = State.SCHEMA;
+if (operatorExec.buildSchema()) {
+  schemaVersion = batchAccessor.schemaVersion();
+  state = State.RUN;
+  return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  state = State.END;
+  return IterOutcome.NONE;
+}
+  }
+
+  /**
+   * Fetch a record batch, detecting EOF and a new schema.
+   * @return the IterOutcome for the above cases
+   */
+
+  private IterOutcome doNext() {
+if (! operatorExec.next()) {
+  state = State.END;
+  return IterOutcome.NONE;
+}
+int newVersion = batchAccessor.schemaVersion();
+if (newVersion != schemaVersion) {
+  schemaVersion = newVersion;
+  return IterOutcome.OK_NEW_SCHEMA;
+}
+return IterOutcome.OK;
+  }
+
+  /**
+   * Implement a cancellation, and ignore any exception that is
+   * thrown. We're 

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168331779
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
--- End diff --

Do we need SCHEMA state as you are moving from START to RUN or END or 
FAILED. may be combine START and SCHEMA states to say GET_SCHEMA or something 
like that ? Also, would it be good to have two states FAILED and CANCELLED to 
differentiate whether the query was cancelled or failed due to error.


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168337843
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
+this.opContext = opServicees;
+this.operatorExec = opExec;
+batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+try {
+  switch (state) {
+  case START:
+return start();
+  case RUN:
+return doNext();
+   default:
+OperatorRecordBatch.logger.debug("Extra call to next() in state " 
+ state + ": " + operatorLabel());
+return IterOutcome.NONE;
+  }
+} catch (UserException e) {
+  cancelSilently();
+  state = State.FAILED;
+  throw e;
+} catch (Throwable t) {
+  cancelSilently();
+  state = State.FAILED;
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", operatorLabel())
+.build(OperatorRecordBatch.logger);
+}
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+try {
+  switch (state) {
+  case START:
+  case RUN:
+cancelSilently();
+break;
+  default:
+break;
+  }
+} finally {
+  state = State.FAILED;
+}
+  }
+
+ /**
+   * Start the operator executor. Bind it to the various contexts.
+   * Then start the executor and fetch the first schema.
+   * @return result of the first batch, which should contain
+   * only a schema, or EOF
+   */
+
+  private IterOutcome start() {
--- End diff --

would be good if we can capture what exceptions each of these functions 
might throw. 


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168325261
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.exec.ops.OperatorContext;
+
+/**
+ * Core protocol for a Drill operator execution.
+ *
+ * Lifecycle
+ *
+ * 
+ * Creation via an operator-specific constructor in the
+ * corresponding RecordBatchCreator.
+ * bind() called to provide the operator services.
+ * buildSchema() called to define the schema before
+ * fetching the first record batch.
+ * next() called repeatedly to prepare each new record
+ * batch until EOF or until cancellation.
+ * cancel() called if the operator should quit early.
+ * close() called to release resources. Note that
+ * close() is called in response to:
+ *   EOF
+ *   After cancel()
+ *   After an exception is thrown.
+ * 
+ *
+ * Error Handling
+ *
+ * Any method can throw an (unchecked) exception. (Drill does not use
+ * checked exceptions.) Preferably, the code will throw a
+ * UserException that explains the error to the user. If any
+ * other kind of exception is thrown, then the enclosing class wraps it
+ * in a generic UserException that indicates that "something went
+ * wrong", which is less than ideal.
+ *
+ * Result Set
+ * The operator "publishes" a result set in response to returning
+ * true from next() by populating a
+ * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
+ * compatibility with other Drill operators, the set of vectors within
+ * the batch must be the same from one batch to the next.
+ */
+
+public interface OperatorExec {
+
+  /**
+   * Bind this operator to the context. The context provides access
+   * to per-operator, per-fragment and per-Drillbit services.
+   * Also provides access to the operator definition (AKA "pop
+   * config") for this operator.
+   *
+   * @param context operator context
+   */
+
+  public void bind(OperatorContext context);
+
+  /**
+   * Provides a generic access mechanism to the batch's output data.
+   * This method is called after a successful return from
+   * {@link #buildSchema()} and {@link #next()}. The batch itself
+   * can be held in a standard {@link VectorContainer}, or in some
+   * other structure more convenient for this operator.
+   *
+   * @return the access for the batch's output container
+   */
+
+  BatchAccessor batchAccessor();
+
+  /**
+   * Retrieves the schema of the batch before the first actual batch
+   * of data. The schema is returned via an empty batch (no rows,
+   * only schema) from {@link #batchAccessor()}.
+   *
+   * @return true if a schema is available, false if the operator
+   * reached EOF before a schema was found
+   */
+
+  boolean buildSchema();
+
+  /**
+   * Retrieves the next batch of data. The data is returned via
+   * the {@link #batchAccessor()} method.
+   *
+   * @return true if another batch of data is available, false if
+   * EOF was reached and no more data is available
+   */
+
+  boolean next();
+
+  /**
+   * Alerts the operator that the query was cancelled. Generally
+   * optional, but allows the operator to realize that a cancellation
--- End diff --

why is this optional ?


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168325765
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Modular implementation of the standard Drill record batch iterator
+ * protocol. The protocol has two parts: control of the operator and
+ * access to the record batch. Each is encapsulated in separate
+ * implementation classes to allow easier customization for each
+ * situation. The operator internals are, themselves, abstracted to
+ * yet another class with the steps represented as method calls rather
+ * than as internal states as in the record batch iterator protocol.
+ * 
+ * Note that downstream operators make an assumption that the
+ * same vectors will appear from one batch to the next. That is,
+ * not only must the schema be the same, but if column "a" appears
+ * in two batches, the same value vector must back "a" in both
+ * batches. The TransferPair abstraction fails if different
+ * vectors appear across batches.
+ */
+
+public class OperatorRecordBatch implements CloseableRecordBatch {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
+
+  private final OperatorDriver driver;
+  private final BatchAccessor batchAccessor;
+
+  public OperatorRecordBatch(FragmentContext context, PhysicalOperator 
config, OperatorExec opExec) {
+OperatorContext opContext = context.newOperatorContext(config);
+opContext.getStats().startProcessing();
+
+// Chicken-and-egg binding: the two objects must know about each 
other. Pass the
+// context to the operator exec via a bind method.
+
+try {
+  opExec.bind(opContext);
+  driver = new OperatorDriver(opContext, opExec);
+  batchAccessor = opExec.batchAccessor();
+} catch (UserException e) {
+  opContext.close();
+  throw e;
+} catch (Throwable t) {
+  opContext.close();
+  throw UserException.executionError(t)
+.addContext("Exception thrown from", 
opExec.getClass().getSimpleName() + ".bind()")
+.build(logger);
+}
+finally {
+  opContext.getStats().stopProcessing();
+}
+  }
+
+  @Override
+  public FragmentContext getContext() {
+
+// Backward compatibility with the full server context. Awkward for 
testing
+
+FragmentContext fragmentContext = fragmentContext();
+if (fragmentContext instanceof FragmentContext) {
--- End diff --

not clear what we are doing here. why can't we just return fragmentContext ?


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-14 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1121#discussion_r168317696
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
--- End diff --

typo ? opServices instead of opServicees ?


---


[GitHub] drill pull request #1121: DRILL-6153: Operator framework

2018-02-12 Thread paul-rogers
GitHub user paul-rogers opened a pull request:

https://github.com/apache/drill/pull/1121

DRILL-6153: Operator framework

Includes the core files for the operator framework revision. See [this 
writeup](https://github.com/paul-rogers/drill/wiki/BH-Operator-Framework) for 
details.

In this commit, nothing depends on this new code. It is, instead, the 
foundation for the revised scan operator to be added after the revised result 
set loader code is committed. Doing this commit now allows small PRs to be done 
in parallel.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/paul-rogers/drill DRILL-6153

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1121


commit 0e52bf4fc7bacf41565ca9a1055219bce3c279fe
Author: Paul Rogers 
Date:   2018-02-13T06:27:23Z

DRILL-6153: Operator framework




---