Repository: drill
Updated Branches:
  refs/heads/master 9f4fff800 -> e9b6e8f3d


DRILL-4593: Remove OldAssignmentCreator in FileSystemPlugin

+ Remove dead code in ParquetGroupScan

this closes #473


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e9b6e8f3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e9b6e8f3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e9b6e8f3

Branch: refs/heads/master
Commit: e9b6e8f3ddadbd308b85ed6d88bcf878147ee77e
Parents: 10afc70
Author: vkorukanti <ve...@dremio.com>
Authored: Thu Apr 7 14:23:07 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Wed Apr 13 10:36:21 2016 -0700

----------------------------------------------------------------------
 .../drill/exec/store/kudu/KuduGroupScan.java    |   2 +-
 .../org/apache/drill/exec/ExecConstants.java    |   3 -
 .../server/options/SystemOptionManager.java     |   1 -
 .../exec/store/dfs/easy/EasyGroupScan.java      |   2 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  39 +----
 .../exec/store/schedule/AssignmentCreator.java  |  13 +-
 .../store/schedule/OldAssignmentCreator.java    | 141 -------------------
 .../drill/exec/store/store/TestAssignment.java  |   2 +-
 8 files changed, 7 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index ff4295d..873f216 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -190,7 +190,7 @@ public class KuduGroupScan extends AbstractGroupScan {
    */
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    assignments = AssignmentCreator.getMappings(incomingEndpoints, 
kuduWorkList, storagePlugin.getContext());
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, 
kuduWorkList);
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 963934d..a490116 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -267,9 +267,6 @@ public interface ExecConstants {
   OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
       new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
 
-  String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
-  OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new 
BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false);
-
   String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute";
   BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new 
BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE, false);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a596d3a..0abdb76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,6 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       ExecConstants.HASH_AGG_TABLE_FACTOR,
       ExecConstants.AVERAGE_FIELD_WIDTH,
       ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
-      ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
       ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR,
       ExecConstants.ADMIN_USERS_VALIDATOR,
       ExecConstants.ADMIN_USER_GROUPS_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index ebea2f4..7a80db3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -195,7 +195,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, 
formatPlugin.getContext());
+    mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
   }
 
   private void createMappings(List<EndpointAffinity> affinities) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 47172cc..5950b74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -34,7 +34,6 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
@@ -46,7 +45,6 @@ import 
org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -59,7 +57,6 @@ import 
org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
 import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
-import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
@@ -87,14 +84,12 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.joda.time.DateTimeUtils;
 
-import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -104,12 +99,8 @@ import com.google.common.collect.Sets;
 @JsonTypeName("parquet-scan")
 public class ParquetGroupScan extends AbstractFileGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
-  static final MetricRegistry metrics = DrillMetrics.getInstance();
-  static final String READ_FOOTER_TIMER = 
MetricRegistry.name(ParquetGroupScan.class, "readFooter");
-
 
   private final List<ReadEntryWithPath> entries;
-  private final Stopwatch watch = Stopwatch.createUnstarted();
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
   private final DrillFileSystem fs;
@@ -716,8 +707,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
               if (column.getNulls() != null) {
                 Long newCount = rowCount - column.getNulls();
                 columnValueCounts.put(schemaPath, 
columnValueCounts.get(schemaPath) + newCount);
-              } else {
-
               }
             }
           } else {
@@ -790,36 +779,10 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     }
   }
 
-  private class BlockMapper extends TimedRunnable<Void> {
-    private final BlockMapBuilder bmb;
-    private final RowGroupInfo rgi;
-
-    public BlockMapper(BlockMapBuilder bmb, RowGroupInfo rgi) {
-      super();
-      this.bmb = bmb;
-      this.rgi = rgi;
-    }
-
-    @Override
-    protected Void runInner() throws Exception {
-      EndpointByteMap ebm = bmb.getEndpointByteMap(rgi);
-      rgi.setEndpointByteMap(ebm);
-      return null;
-    }
-
-    @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException(String.format(
-          "Failure while trying to get block locations for file %s starting at 
%d.", rgi.getPath(),
-          rgi.getStart()));
-    }
-
-  }
-
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) 
throws PhysicalOperatorSetupException {
 
-    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, 
rowGroupInfos, formatPlugin.getContext());
+    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, 
rowGroupInfos);
   }
 
   @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 632cf66..eed200e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -27,9 +27,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
 
 import com.carrotsearch.hppc.cursors.ObjectLongCursor;
 import com.google.common.base.Stopwatch;
@@ -91,14 +89,9 @@ public class AssignmentCreator<T extends CompleteWork> {
    * @param units the list of work units to be assigned
    * @return A multimap that maps each minor fragment id to a list of work 
units
    */
-  public static <T extends CompleteWork> ListMultimap<Integer,T> 
getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units, 
DrillbitContext context) {
-    boolean useOldAssignmentCode = context == null ? false : 
context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val;
-    if (useOldAssignmentCode) {
-      return OldAssignmentCreator.getMappings(incomingEndpoints, units);
-    } else {
-      AssignmentCreator<T> creator = new 
AssignmentCreator<>(incomingEndpoints, units);
-      return creator.getMappings();
-    }
+  public static <T extends CompleteWork> ListMultimap<Integer,T> 
getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+    AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints, 
units);
+    return creator.getMappings();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
deleted file mode 100644
index 48bb5f3..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.schedule;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-
-/**
- * The OldAssignmentCreator is responsible for assigning a set of work units 
to the available slices.
- */
-public class OldAssignmentCreator<T extends CompleteWork> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
-
-  static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
-  private final ArrayListMultimap<Integer, T> mappings;
-  private final List<DrillbitEndpoint> endpoints;
-
-
-
-
-  /**
-   * Given a set of endpoints to assign work to, attempt to evenly assign work 
based on affinity of work units to
-   * Drillbits.
-   *
-   * @param incomingEndpoints
-   *          The set of nodes to assign work to. Note that nodes can be 
listed multiple times if we want to have
-   *          multiple slices on a node working on the task simultaneously.
-   * @param units
-   *          The work units to assign.
-   * @return ListMultimap of Integer > List<CompleteWork> (based on their 
incoming order) to with
-   */
-  public static <T extends CompleteWork> ListMultimap<Integer, T> 
getMappings(List<DrillbitEndpoint> incomingEndpoints,
-                                                                              
List<T> units) {
-    OldAssignmentCreator<T> creator = new 
OldAssignmentCreator<T>(incomingEndpoints, units);
-    return creator.mappings;
-  }
-
-   OldAssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> 
units) {
-    logger.debug("Assigning {} units to {} endpoints", units.size(), 
incomingEndpoints.size());
-    Stopwatch watch = Stopwatch.createUnstarted();
-
-    Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), 
String.format("Incoming endpoints %d "
-        + "is greater than number of row groups %d", incomingEndpoints.size(), 
units.size()));
-    this.mappings = ArrayListMultimap.create();
-    this.endpoints = Lists.newLinkedList(incomingEndpoints);
-
-    ArrayList<T> rowGroupList = new ArrayList<>(units);
-    for (double cutoff : ASSIGNMENT_CUTOFFS) {
-      scanAndAssign(rowGroupList, cutoff, false, false);
-    }
-    scanAndAssign(rowGroupList, 0.0, true, false);
-    scanAndAssign(rowGroupList, 0.0, true, true);
-
-    logger.debug("Took {} ms to apply assignments", 
watch.elapsed(TimeUnit.MILLISECONDS));
-    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should 
be assigned by now, but some are still unassigned");
-    Preconditions.checkState(!units.isEmpty());
-
-  }
-
-  /**
-   *
-   * @param mappings
-   *          the mapping between fragment/endpoint and rowGroup
-   * @param endpoints
-   *          the list of drillbits, ordered by the corresponding fragment
-   * @param workunits
-   *          the list of rowGroups to assign
-   * @param requiredPercentage
-   *          the percentage of max bytes required to make an assignment
-   * @param assignAll
-   *          if true, will assign even if no affinity
-   */
-  private void scanAndAssign(List<T> workunits, double requiredPercentage, 
boolean assignAllToEmpty, boolean assignAll) {
-    Collections.sort(workunits);
-    int fragmentPointer = 0;
-    final boolean requireAffinity = requiredPercentage > 0;
-    int maxAssignments = (int) (workunits.size() / endpoints.size());
-
-    if (maxAssignments < 1) {
-      maxAssignments = 1;
-    }
-
-    for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
-      T unit = iter.next();
-      for (int i = 0; i < endpoints.size(); i++) {
-        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
-        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
-        EndpointByteMap endpointByteMap = unit.getByteMap();
-        boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
-
-        if (assignAll
-            || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
-            || (!endpointByteMap.isEmpty() && (!requireAffinity || 
haveAffinity)
-            && (!mappings.containsKey(minorFragmentId) || 
mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || 
endpointByteMap
-            .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * 
requiredPercentage))) {
-
-          mappings.put(minorFragmentId, unit);
-          logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, 
minorFragmentId);
-          // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint 
{}", rowGroupInfo.getRowGroupIndex(),
-          // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
-          // if (bytesPerEndpoint.get(currentEndpoint) != null) {
-          // // 
assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / 
rowGroupInfo.getLength());
-          // } else {
-          // // assignmentAffinityStats.update(0);
-          // }
-          iter.remove();
-          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
-          break;
-        }
-      }
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 1efc793..65d8cf7 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -63,7 +63,7 @@ public class TestAssignment {
       incomingEndpoints.add(incomingEndpointsIterator.next());
     }
 
-    ListMultimap<Integer, CompleteFileWork> mappings = 
AssignmentCreator.getMappings(incomingEndpoints, chunks, null);
+    ListMultimap<Integer, CompleteFileWork> mappings = 
AssignmentCreator.getMappings(incomingEndpoints, chunks);
     System.out.println(mappings.keySet().size());
     for (int i = 0; i < width; i++) {
       Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null 
&& mappings.get(i).size() > 0);

Reply via email to