Ian Maxon has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/387

Change subject: Remove old Hadoop compat layer
......................................................................

Remove old Hadoop compat layer

This module is:
a) not functional
b) dependent on edu.uci.ics.dcache, which is obsolete
Hence, for now, we should remove it from the source tree.

Change-Id: I98b18f24a20dcd8dc75e828e47fb0ab98179a5bf
---
D hyracks/hyracks-hadoop-compat/pom.xml
D hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java
D 
hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java
M hyracks/pom.xml
11 files changed, 1 insertion(+), 1,402 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/87/387/1

diff --git a/hyracks/hyracks-hadoop-compat/pom.xml 
b/hyracks/hyracks-hadoop-compat/pom.xml
deleted file mode 100644
index 2e15200..0000000
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-hadoop-compat</artifactId>
-  <name>hyracks-hadoop-compat</name>
-
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.16-SNAPSHOT</version>
-  </parent>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>2.0.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-          <fork>true</fork>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>appassembler-maven-plugin</artifactId>
-        <version>1.3</version>
-        <executions>
-          <execution>
-            <configuration>
-              <programs>
-                <program>
-                  
<mainClass>org.apache.hyracks.hadoop.compat.driver.CompatibilityLayer</mainClass>
-                  <name>hadoop-compat</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.2-beta-5</version>
-        <executions>
-          <execution>
-            <configuration>
-              <descriptors>
-                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
-              </descriptors>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>attached</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-       <dependency>
-               <groupId>org.apache.hadoop</groupId>
-               <artifactId>hadoop-client</artifactId>
-               <type>jar</type>
-               <scope>compile</scope>
-       </dependency>
-    <dependency>
-        <groupId>edu.uci.ics.dcache</groupId>
-        <artifactId>dcache-client</artifactId>
-        <version>0.0.1</version>
-        <scope>compile</scope>
-    </dependency>
-    <dependency>
-       <groupId>org.apache.hyracks</groupId>
-       <artifactId>hyracks-dataflow-hadoop</artifactId>
-       <version>0.2.16-SNAPSHOT</version>
-       <type>jar</type>
-       <scope>compile</scope>
-    </dependency>
-  </dependencies>
-</project>
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml 
b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
deleted file mode 100644
index d4a2ea1..0000000
--- a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<assembly>
-  <id>binary-assembly</id>
-  <formats>
-    <format>zip</format>
-    <format>dir</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>target/appassembler/bin</directory>
-      <outputDirectory>bin</outputDirectory>
-      <fileMode>0755</fileMode>
-    </fileSet>
-    <fileSet>
-      <directory>target/appassembler/lib</directory>
-      <outputDirectory>lib</outputDirectory>
-    </fileSet>
-  </fileSets>
-</assembly>
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java
deleted file mode 100644
index d0ef2a9..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.client;
-
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.hadoop.compat.util.ConfigurationConstants;
-import org.apache.hyracks.hadoop.compat.util.Utilities;
-
-public class HyracksClient {
-
-    private static HyracksConnection connection;
-    private static final String jobProfilingKey = "jobProfilingKey";
-    Set<String> systemLibs;
-
-    public HyracksClient(Properties clusterProperties) throws Exception {
-        initialize(clusterProperties);
-    }
-
-    private void initialize(Properties properties) throws Exception {
-        String clusterController = (String) 
properties.get(ConfigurationConstants.clusterControllerHost);
-        connection = new HyracksConnection(clusterController, 1098);
-        systemLibs = new HashSet<String>();
-        for (String systemLib : ConfigurationConstants.systemLibs) {
-            String systemLibPath = properties.getProperty(systemLib);
-            if (systemLibPath != null) {
-                systemLibs.add(systemLibPath);
-            }
-        }
-    }
-
-    public HyracksClient(String clusterConf, char delimiter) throws Exception {
-        Properties properties = Utilities.getProperties(clusterConf, 
delimiter);
-        initialize(properties);
-    }
-
-    public JobStatus getJobStatus(JobId jobId) throws Exception {
-        return connection.getJobStatus(jobId);
-    }
-
-    public HyracksRunningJob submitJob(String applicationName, 
JobSpecification spec) throws Exception {
-        String jobProfilingVal = System.getenv(jobProfilingKey);
-        boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
-        JobId jobId;
-        if (doProfiling) {
-            System.out.println("PROFILING");
-            jobId = connection.startJob(spec, 
EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        } else {
-            jobId = connection.startJob(spec);
-        }
-        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, 
this);
-        return runningJob;
-    }
-
-    public HyracksRunningJob submitJob(String applicationName, 
JobSpecification spec, Set<String> userLibs)
-            throws Exception {
-        return submitJob(applicationName, spec);
-    }
-
-    public void waitForCompleton(JobId jobId) throws Exception {
-        connection.waitForCompletion(jobId);
-    }
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java
deleted file mode 100644
index 921de2d..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
-
-public class HyracksRunningJob implements RunningJob {
-
-    JobId jobId;
-    IOperatorDescriptorRegistry spec;
-    HyracksClient hyracksClient;
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(JobId jobId) {
-        this.jobId = jobId;
-    }
-
-    public IOperatorDescriptorRegistry getSpec() {
-        return spec;
-    }
-
-    public void setSpec(IOperatorDescriptorRegistry spec) {
-        this.spec = spec;
-    }
-
-    public HyracksRunningJob(JobId jobId, IOperatorDescriptorRegistry jobSpec, 
HyracksClient hyracksClient) {
-        this.spec = jobSpec;
-        this.jobId = jobId;
-        this.hyracksClient = hyracksClient;
-    }
-
-    @Override
-    public float cleanupProgress() throws IOException {
-        return 0;
-    }
-
-    @Override
-    public Counters getCounters() throws IOException {
-        return new Counters();
-    }
-
-    @Override
-    public JobID getID() {
-        return new JobID(this.jobId.toString(), 1);
-    }
-
-    @Override
-    public String getJobFile() {
-        return "";
-    }
-
-    @Override
-    public String getJobID() {
-        return this.jobId.toString();
-    }
-
-    @Override
-    public String getJobName() {
-        return this.jobId.toString();
-    }
-
-    @Override
-    public int getJobState() throws IOException {
-        return isComplete() ? 2 : 1;
-    }
-
-    @Override
-    public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws 
IOException {
-        return new TaskCompletionEvent[0];
-    }
-
-    @Override
-    public String getTrackingURL() {
-        return " running on hyrax, remote kill is not supported ";
-    }
-
-    @Override
-    public boolean isComplete() throws IOException {
-        org.apache.hyracks.api.job.JobStatus status = null;
-        try {
-            status = hyracksClient.getJobStatus(jobId);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        return status.equals(org.apache.hyracks.api.job.JobStatus.TERMINATED);
-    }
-
-    @Override
-    public boolean isSuccessful() throws IOException {
-        return isComplete();
-    }
-
-    @Override
-    public void killJob() throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void killTask(String taskId, boolean shouldFail) throws IOException 
{
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public float mapProgress() throws IOException {
-        return 0;
-    }
-
-    @Override
-    public float reduceProgress() throws IOException {
-        return 0;
-    }
-
-    @Override
-    public void setJobPriority(String priority) throws IOException {
-
-    }
-
-    @Override
-    public float setupProgress() throws IOException {
-        return 0;
-    }
-
-    @Override
-    public void waitForCompletion() throws IOException {
-        while (!isComplete()) {
-            try {
-                Thread.sleep(5000);
-            } catch (InterruptedException ie) {
-            }
-        }
-    }
-
-    @Override
-    public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void killTask(TaskAttemptID arg0, boolean arg1) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public String getFailureInfo() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public String getHistoryUrl() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public JobStatus getJobStatus() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public boolean isRetired() throws IOException {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java
deleted file mode 100644
index 87b0127..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.driver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.kohsuke.args4j.CmdLineParser;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.hadoop.compat.client.HyracksClient;
-import org.apache.hyracks.hadoop.compat.client.HyracksRunningJob;
-import org.apache.hyracks.hadoop.compat.util.CompatibilityConfig;
-import org.apache.hyracks.hadoop.compat.util.ConfigurationConstants;
-import org.apache.hyracks.hadoop.compat.util.DCacheHandler;
-import org.apache.hyracks.hadoop.compat.util.HadoopAdapter;
-import org.apache.hyracks.hadoop.compat.util.Utilities;
-
-public class CompatibilityLayer {
-
-    HyracksClient hyracksClient;
-    DCacheHandler dCacheHander = null;
-    Properties clusterConf;
-    HadoopAdapter hadoopAdapter;
-
-    private static char configurationFileDelimiter = '=';
-    private static final String dacheKeyPrefix = "dcache.key";
-
-    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
-        initialize(clConfig);
-    }
-
-    private void initialize(CompatibilityConfig clConfig) throws Exception {
-        clusterConf = Utilities.getProperties(clConfig.clusterConf, 
configurationFileDelimiter);
-        hadoopAdapter = new 
HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
-        hyracksClient = new HyracksClient(clusterConf);
-        dCacheHander = new 
DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
-    }
-
-    public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) 
throws Exception {
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        jobConfs.add(conf);
-        String applicationName = conf.getJobName() + 
System.currentTimeMillis();
-        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-        HyracksRunningJob hyracksRunningJob = 
hyracksClient.submitJob(applicationName, spec, userLibs);
-        return hyracksRunningJob;
-    }
-
-    public HyracksRunningJob submitJobs(String applicationName, String[] 
jobFiles, Set<String> userLibs)
-            throws Exception {
-        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-        populateDCache(jobFiles[0]);
-        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-        HyracksRunningJob hyracksRunningJob = 
hyracksClient.submitJob(applicationName, spec, userLibs);
-        return hyracksRunningJob;
-    }
-
-    public HyracksRunningJob submitJobs(String applicationName, String[] 
jobFiles) throws Exception {
-        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-        populateDCache(jobFiles[0]);
-        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-        HyracksRunningJob hyracksRunningJob = 
hyracksClient.submitJob(applicationName, spec);
-        return hyracksRunningJob;
-    }
-
-    private void populateDCache(String jobFile) throws IOException {
-        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
-        String tempDir = "/tmp";
-        if (dcacheTasks.size() > 0) {
-            for (String key : dcacheTasks.keySet()) {
-                String destPath = tempDir + "/" + key + 
System.currentTimeMillis();
-                hadoopAdapter.getHDFSClient().copyToLocalFile(new 
Path(dcacheTasks.get(key)), new Path(destPath));
-                System.out.println(" source :" + dcacheTasks.get(key));
-                System.out.println(" dest :" + destPath);
-                System.out.println(" key :" + key);
-                System.out.println(" value :" + destPath);
-                dCacheHander.put(key, destPath);
-            }
-        }
-    }
-
-    private String getApplicationNameForHadoopJob(JobConf jobConf) {
-        String jar = jobConf.getJar();
-        if (jar != null) {
-            return jar.substring(jar.lastIndexOf("/") >= 0 ? 
jar.lastIndexOf("/") + 1 : 0);
-        } else {
-            return "" + System.currentTimeMillis();
-        }
-    }
-
-    private Map<String, String> initializeCustomProperties(Properties 
properties, String prefix) {
-        Map<String, String> foundProperties = new HashMap<String, String>();
-        Set<Entry<Object, Object>> entrySet = properties.entrySet();
-        for (Entry entry : entrySet) {
-            String key = (String) entry.getKey();
-            String value = (String) entry.getValue();
-            if ((key.startsWith(prefix))) {
-                String actualKey = key.substring(prefix.length() + 1); // "cut 
off '<prefix>.' from the beginning"
-                foundProperties.put(actualKey, value);
-            }
-        }
-        return foundProperties;
-    }
-
-    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, ',');
-        Map<String, String> dcacheTasks = new HashMap<String, String>();
-        Map<String, String> dcacheKeys = 
initializeCustomProperties(jobProperties, dacheKeyPrefix);
-        for (String key : dcacheKeys.keySet()) {
-            String sourcePath = dcacheKeys.get(key);
-            if (sourcePath != null) {
-                dcacheTasks.put(key, sourcePath);
-            }
-        }
-        return dcacheTasks;
-    }
-
-    public void waitForCompletion(JobId jobId) throws Exception {
-        hyracksClient.waitForCompleton(jobId);
-    }
-
-    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws 
Exception {
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        for (String jobFile : jobFiles) {
-            jobConfs.add(constructHadoopJobConf(jobFile));
-        }
-        return jobConfs;
-    }
-
-    private JobConf constructHadoopJobConf(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, '=');
-        JobConf conf = new JobConf(hadoopAdapter.getConf());
-        for (Entry entry : jobProperties.entrySet()) {
-            conf.set((String) entry.getKey(), (String) entry.getValue());
-            System.out.println((String) entry.getKey() + " : " + (String) 
entry.getValue());
-        }
-        return conf;
-    }
-
-    private String[] getJobs(CompatibilityConfig clConfig) {
-        return clConfig.jobFiles == null ? new String[0] : 
clConfig.jobFiles.split(",");
-    }
-
-    public static void main(String args[]) throws Exception {
-        long startTime = System.nanoTime();
-        CompatibilityConfig clConfig = new CompatibilityConfig();
-        CmdLineParser cp = new CmdLineParser(clConfig);
-        try {
-            cp.parseArgument(args);
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            cp.printUsage(System.err);
-            return;
-        }
-        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
-        String applicationName = clConfig.applicationName;
-        String[] jobFiles = compatLayer.getJobs(clConfig);
-        String[] userLibraries = null;
-        if (clConfig.userLibs != null) {
-            userLibraries = clConfig.userLibs.split(",");
-        }
-        try {
-            HyracksRunningJob hyraxRunningJob = null;
-            if (userLibraries != null) {
-                Set<String> userLibs = new HashSet<String>();
-                for (String userLib : userLibraries) {
-                    userLibs.add(userLib);
-                }
-                hyraxRunningJob = compatLayer.submitJobs(applicationName, 
jobFiles, userLibs);
-            } else {
-                hyraxRunningJob = compatLayer.submitJobs(applicationName, 
jobFiles);
-            }
-            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
-            long end_time = System.nanoTime();
-            System.out.println("TOTAL TIME (from Launch to Completion):"
-                    + ((end_time - startTime) / (float) 1000000000.0) + " 
seconds.");
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java
deleted file mode 100644
index fb4b607..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.util;
-
-import org.kohsuke.args4j.Option;
-
-public class CompatibilityConfig {
-
-       @Option(name = "-cluster", required = true, usage = "Defines the path 
to the configuration file that provides the following info: +"
-                       + " (1) Address of HyracksClusterController service"
-                       + " (2) Address of Hadoop namenode service")
-       public String clusterConf;
-
-       @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
-                       + "Each job file defines the hadoop job + "
-                       + "The order in the list defines the sequence in which"
-                       + "the jobs are to be executed")
-       public String jobFiles;
-
-       @Option(name = "-applicationName", usage = " The application as part of 
which the job executes")
-       public String applicationName;
-
-       @Option(name = "-userLibs", usage = " A comma separated list of jar 
files that are required to be addedd to classpath when running ")
-       public String userLibs;
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java
deleted file mode 100644
index 88cef77..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.util;
-
-public class ConfigurationConstants {
-
-    public static final String clusterControllerHost = "clusterControllerHost";
-    public static final String namenodeURL = "fs.default.name";
-    public static final String dcacheServerConfiguration = 
"dcacheServerConfiguration";
-    public static final String[] systemLibs = new String[] { 
"hyracksDataflowStdLib", "hyracksDataflowCommonLib",
-            "hyracksDataflowHadoopLib", "hadoopCoreLib"};
-
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java
deleted file mode 100644
index 39a43fc..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.util;
-
-import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-import edu.uci.ics.dcache.client.DCacheClient;
-import edu.uci.ics.dcache.client.DCacheClientConfig;
-
-public class DCacheHandler {
-
-    String clientPropertyFile;
-    String key;
-    String valueFilePath;
-
-    public enum Operation {
-        GET,
-        PUT,
-        DELETE
-    }
-
-    private static DCacheClient dCacheClient;
-    private static final String[] operations = { "GET", "PUT", "DELETE" };
-
-    public DCacheHandler(String clientPropertyFile) throws Exception {
-        this.clientPropertyFile = clientPropertyFile;
-        init();
-    }
-
-    private void init() throws Exception {
-        dCacheClient = DCacheClient.get();
-        DCacheClientConfig dcacheClientConfig = new DCacheClientConfig();
-        dCacheClient.init(dcacheClientConfig);
-    }
-
-    public static DCacheClient getInstance(String clientPropertyFile) {
-        if (dCacheClient == null) {
-            dCacheClient = DCacheClient.get();
-        }
-        return dCacheClient;
-    }
-
-    public String getClientPropertyFile() {
-        return clientPropertyFile;
-    }
-
-    public void put(String key, String value) throws IOException {
-        dCacheClient.set(key, value);
-        System.out.println(" added to cache " + key + " : " + value);
-    }
-
-    public Object get(String key) throws IOException {
-        return dCacheClient.get(key);
-    }
-
-    public void delete(String key) throws IOException {
-        dCacheClient.delete(key);
-    }
-
-    public Object performOperation(String operation, String[] args) throws 
Exception {
-        Object returnValue = null;
-        int operationIndex = getOperation(operation);
-        switch (operationIndex) {
-            case 0:
-                returnValue = dCacheClient.get(args[2]);
-                System.out.println(" get from cache " + returnValue);
-                break;
-            case 1:
-                dCacheClient.set(args[2], args[3]);
-                System.out.println(" added to cache " + args[2] + " : " + 
args[3]);
-                break;
-            case 2:
-                dCacheClient.delete(args[2]);
-                System.out.println(" removed from cache " + args[2]);
-                break;
-            default:
-                System.out.println("Error : Operation not supported !");
-                break;
-        }
-        return returnValue;
-    }
-
-    private int getOperation(String operation) {
-        for (int i = 0; i < operations.length; i++) {
-            if (operations[i].equalsIgnoreCase(operation)) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java
deleted file mode 100644
index d2f9c19..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.util;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
-import org.apache.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
-import org.apache.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
-import 
org.apache.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
-import 
org.apache.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
-import 
org.apache.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
-import 
org.apache.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
-
-public class HadoopAdapter {
-
-       public static final String FS_DEFAULT_NAME = "fs.default.name";
-       private JobConf jobConf;
-       private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new 
HashMap<OperatorDescriptorId, Integer>();
-       public static final String HYRACKS_EX_SORT_FRAME_LIMIT = 
"HYRACKS_EX_SORT_FRAME_LIMIT";
-       public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
-       public static final int DEFAULT_MAX_MAPPERS = 40;
-       public static final int DEFAULT_MAX_REDUCERS = 40;
-       public static final String MAX_MAPPERS_KEY = "maxMappers";
-       public static final String MAX_REDUCERS_KEY = "maxReducers";
-       public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
-
-       private int maxMappers = DEFAULT_MAX_MAPPERS;
-       private int maxReducers = DEFAULT_MAX_REDUCERS;
-       private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
-
-       class NewHadoopConstants {
-               public static final String INPUT_FORMAT_CLASS_ATTR = 
"mapreduce.inputformat.class";
-               public static final String MAP_CLASS_ATTR = 
"mapreduce.map.class";
-               public static final String COMBINE_CLASS_ATTR = 
"mapreduce.combine.class";
-               public static final String REDUCE_CLASS_ATTR = 
"mapreduce.reduce.class";
-               public static final String OUTPUT_FORMAT_CLASS_ATTR = 
"mapreduce.outputformat.class";
-               public static final String PARTITIONER_CLASS_ATTR = 
"mapreduce.partitioner.class";
-       }
-
-       public HadoopAdapter(String namenodeUrl) {
-               jobConf = new JobConf(true);
-               jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
-               if (System.getenv(MAX_MAPPERS_KEY) != null) {
-                       maxMappers = 
Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
-               }
-               if (System.getenv(MAX_REDUCERS_KEY) != null) {
-                       maxReducers = 
Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
-               }
-               if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
-                       exSortFrame = Integer.parseInt(System
-                                       .getenv(EX_SORT_FRAME_LIMIT_KEY));
-               }
-       }
-
-       private String getEnvironmentVariable(String key, String def) {
-               String ret = System.getenv(key);
-               return ret != null ? ret : def;
-       }
-
-       public JobConf getConf() {
-               return jobConf;
-       }
-       //TODO: Why is there now a type mismatch? Why does a bounded wildcard 
fix it?
-       public static VersionedProtocol getProtocol(Class<? extends 
VersionedProtocol> protocolClass,
-                       InetSocketAddress inetAddress, JobConf jobConf) throws 
IOException {
-               VersionedProtocol versionedProtocol = 
RPC.getProxy(protocolClass,
-                               ClientProtocol.versionID, inetAddress, jobConf);
-               return versionedProtocol;
-       }
-
-       private static RecordDescriptor getHadoopRecordDescriptor(
-                       String className1, String className2) {
-               RecordDescriptor recordDescriptor = null;
-               try {
-                       recordDescriptor = 
DatatypeHelper.createKeyValueRecordDescriptor(
-                                       (Class<? extends Writable>) 
Class.forName(className1),
-                                       (Class<? extends Writable>) 
Class.forName(className2));
-               } catch (ClassNotFoundException cnfe) {
-                       cnfe.printStackTrace();
-               }
-               return recordDescriptor;
-       }
-
-       private Object[] getInputSplits(JobConf conf) throws IOException,
-                       ClassNotFoundException, InterruptedException {
-               if (conf.getUseNewMapper()) {
-                       return getNewInputSplits(conf);
-               } else {
-                       return getOldInputSplits(conf);
-               }
-       }
-
-       private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
-                       JobConf conf) throws ClassNotFoundException, 
IOException,
-                       InterruptedException {
-               org.apache.hadoop.mapreduce.InputSplit[] splits = null;
-               JobContext context = new JobContextImpl(conf, null);
-               org.apache.hadoop.mapreduce.InputFormat inputFormat = 
ReflectionUtils
-                               .newInstance(context.getInputFormatClass(), 
conf);
-               List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = 
inputFormat
-                               .getSplits(context);
-               return inputSplits
-                               .toArray(new 
org.apache.hadoop.mapreduce.InputSplit[] {});
-       }
-
-       private InputSplit[] getOldInputSplits(JobConf conf) throws IOException 
{
-               InputFormat inputFormat = conf.getInputFormat();
-               return inputFormat.getSplits(conf, conf.getNumMapTasks());
-       }
-
-       private void configurePartitionCountConstraint(JobSpecification spec,
-                       IOperatorDescriptor operator, int instanceCount) {
-               PartitionConstraintHelper.addPartitionCountConstraint(spec, 
operator,
-                               instanceCount);
-               operatorInstanceCount.put(operator.getOperatorId(), 
instanceCount);
-       }
-
-       public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
-                       JobSpecification spec, IOperatorDescriptor previousOp)
-                       throws Exception {
-               boolean selfRead = previousOp == null;
-               IHadoopClassFactory classFactory = new 
ClasspathBasedHadoopClassFactory();
-               HadoopMapperOperatorDescriptor mapOp = null;
-               if (selfRead) {
-                       Object[] splits = getInputSplits(conf, maxMappers);
-                       mapOp = new HadoopMapperOperatorDescriptor(spec, conf, 
splits,
-                                       classFactory);
-                       configurePartitionCountConstraint(spec, mapOp, 
splits.length);
-               } else {
-                       configurePartitionCountConstraint(spec, mapOp,
-                                       getInstanceCount(previousOp));
-                       mapOp = new HadoopMapperOperatorDescriptor(spec, conf, 
classFactory);
-                       spec.connect(new OneToOneConnectorDescriptor(spec), 
previousOp, 0,
-                                       mapOp, 0);
-               }
-               return mapOp;
-       }
-
-       public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
-                       IOperatorDescriptorRegistry spec, boolean 
useAsCombiner) {
-               HadoopReducerOperatorDescriptor reduceOp = new 
HadoopReducerOperatorDescriptor(
-                               spec, conf, null, new 
ClasspathBasedHadoopClassFactory(),
-                               useAsCombiner);
-               return reduceOp;
-       }
-
-       public FileSystem getHDFSClient() {
-               FileSystem fileSystem = null;
-               try {
-                       fileSystem = FileSystem.get(jobConf);
-               } catch (IOException ioe) {
-                       ioe.printStackTrace();
-               }
-               return fileSystem;
-       }
-
-       public JobSpecification getJobSpecification(List<JobConf> jobConfs)
-                       throws Exception {
-               JobSpecification spec = null;
-               if (jobConfs.size() == 1) {
-                       spec = getJobSpecification(jobConfs.get(0));
-               } else {
-                       spec = getPipelinedSpec(jobConfs);
-               }
-               return spec;
-       }
-
-       private IOperatorDescriptor configureOutput(
-                       IOperatorDescriptor previousOperator, JobConf conf,
-                       JobSpecification spec) throws Exception {
-               int instanceCountPreviousOperator = operatorInstanceCount
-                               .get(previousOperator.getOperatorId());
-               int numOutputters = conf.getNumReduceTasks() != 0 ? conf
-                               .getNumReduceTasks() : 
instanceCountPreviousOperator;
-               HadoopWriteOperatorDescriptor writer = null;
-               writer = new HadoopWriteOperatorDescriptor(spec, conf, 
numOutputters);
-               configurePartitionCountConstraint(spec, writer, numOutputters);
-               spec.connect(new OneToOneConnectorDescriptor(spec), 
previousOperator,
-                               0, writer, 0);
-               return writer;
-       }
-
-       private int getInstanceCount(IOperatorDescriptor operator) {
-               return operatorInstanceCount.get(operator.getOperatorId());
-       }
-
-       private IOperatorDescriptor addCombiner(
-                       IOperatorDescriptor previousOperator, JobConf jobConf,
-                       JobSpecification spec) throws Exception {
-               boolean useCombiner = (jobConf.getCombinerClass() != null);
-               IOperatorDescriptor mapSideOutputOp = previousOperator;
-               if (useCombiner) {
-                       System.out.println("Using Combiner:"
-                                       + jobConf.getCombinerClass().getName());
-                       IOperatorDescriptor mapSideCombineSortOp = 
getExternalSorter(
-                                       jobConf, spec);
-                       configurePartitionCountConstraint(spec, 
mapSideCombineSortOp,
-                                       getInstanceCount(previousOperator));
-
-                       HadoopReducerOperatorDescriptor mapSideCombineReduceOp 
= getReducer(
-                                       jobConf, spec, true);
-                       configurePartitionCountConstraint(spec, 
mapSideCombineReduceOp,
-                                       getInstanceCount(previousOperator));
-                       spec.connect(new OneToOneConnectorDescriptor(spec),
-                                       previousOperator, 0, 
mapSideCombineSortOp, 0);
-                       spec.connect(new OneToOneConnectorDescriptor(spec),
-                                       mapSideCombineSortOp, 0, 
mapSideCombineReduceOp, 0);
-                       mapSideOutputOp = mapSideCombineReduceOp;
-               }
-               return mapSideOutputOp;
-       }
-
-       private int getNumReduceTasks(JobConf jobConf) {
-               int numReduceTasks = Math.min(maxReducers, 
jobConf.getNumReduceTasks());
-               return numReduceTasks;
-       }
-
-       private IOperatorDescriptor addReducer(
-                       IOperatorDescriptor previousOperator, JobConf jobConf,
-                       JobSpecification spec) throws Exception {
-               IOperatorDescriptor mrOutputOperator = previousOperator;
-               if (jobConf.getNumReduceTasks() != 0) {
-                       IOperatorDescriptor sorter = getExternalSorter(jobConf, 
spec);
-                       HadoopReducerOperatorDescriptor reducer = 
getReducer(jobConf, spec,
-                                       false);
-                       int numReduceTasks = getNumReduceTasks(jobConf);
-                       configurePartitionCountConstraint(spec, sorter, 
numReduceTasks);
-                       configurePartitionCountConstraint(spec, reducer, 
numReduceTasks);
-
-                       IConnectorDescriptor mToNConnectorDescriptor = 
getMtoNHashPartitioningConnector(
-                                       jobConf, spec);
-                       spec.connect(mToNConnectorDescriptor, previousOperator, 
0, sorter,
-                                       0);
-                       spec.connect(new OneToOneConnectorDescriptor(spec), 
sorter, 0,
-                                       reducer, 0);
-                       mrOutputOperator = reducer;
-               }
-               return mrOutputOperator;
-       }
-
-       private long getInputSize(Object[] splits, JobConf conf)
-                       throws IOException, InterruptedException {
-               long totalInputSize = 0;
-               if (conf.getUseNewMapper()) {
-                       for (org.apache.hadoop.mapreduce.InputSplit split : 
(org.apache.hadoop.mapreduce.InputSplit[]) splits) {
-                               totalInputSize += split.getLength();
-                       }
-               } else {
-                       for (InputSplit split : (InputSplit[]) splits) {
-                               totalInputSize += split.getLength();
-                       }
-               }
-               return totalInputSize;
-       }
-
-       private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
-                       throws Exception {
-               Object[] splits = getInputSplits(conf);
-               if (splits.length > desiredMaxMappers) {
-                       long totalInputSize = getInputSize(splits, conf);
-                       long goalSize = (totalInputSize / desiredMaxMappers);
-                       conf.setLong("mapred.min.split.size", goalSize);
-                       conf.setNumMapTasks(desiredMaxMappers);
-                       splits = getInputSplits(conf);
-               }
-               return splits;
-       }
-
-       public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
-                       throws Exception {
-               JobSpecification spec = new JobSpecification();
-               Iterator<JobConf> iterator = jobConfs.iterator();
-               JobConf firstMR = iterator.next();
-               IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, 
firstMR);
-               while (iterator.hasNext())
-                       for (JobConf currentJobConf : jobConfs) {
-                               mrOutputOp = configureMapReduce(mrOutputOp, 
spec,
-                                               currentJobConf);
-                       }
-               configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), 
spec);
-               return spec;
-       }
-
-       public JobSpecification getJobSpecification(JobConf conf) throws 
Exception {
-               JobSpecification spec = new JobSpecification();
-               IOperatorDescriptor mrOutput = configureMapReduce(null, spec, 
conf);
-               IOperatorDescriptor printer = configureOutput(mrOutput, conf, 
spec);
-               spec.addRoot(printer);
-               System.out.println(spec);
-               return spec;
-       }
-
-       private IOperatorDescriptor configureMapReduce(
-                       IOperatorDescriptor previousOuputOp, JobSpecification 
spec,
-                       JobConf conf) throws Exception {
-               IOperatorDescriptor mapper = getMapper(conf, spec, 
previousOuputOp);
-               IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, 
spec);
-               IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, 
spec);
-               return reducer;
-       }
-
-       public static InMemorySortOperatorDescriptor getInMemorySorter(
-                       JobConf conf, IOperatorDescriptorRegistry spec) {
-               InMemorySortOperatorDescriptor inMemorySortOp = null;
-               RecordDescriptor recordDescriptor = 
getHadoopRecordDescriptor(conf
-                               .getMapOutputKeyClass().getName(), conf
-                               .getMapOutputValueClass().getName());
-               Class<? extends RawComparator> rawComparatorClass = null;
-               WritableComparator writableComparator = 
WritableComparator.get(conf
-                               
.getMapOutputKeyClass().asSubclass(WritableComparable.class));
-               WritableComparingBinaryComparatorFactory comparatorFactory = 
new WritableComparingBinaryComparatorFactory(
-                               writableComparator.getClass());
-               inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
-                               new int[] { 0 },
-                               new IBinaryComparatorFactory[] { 
comparatorFactory },
-                               recordDescriptor);
-               return inMemorySortOp;
-       }
-
-       public static ExternalSortOperatorDescriptor getExternalSorter(
-                       JobConf conf, IOperatorDescriptorRegistry spec) {
-               ExternalSortOperatorDescriptor externalSortOp = null;
-               RecordDescriptor recordDescriptor = 
getHadoopRecordDescriptor(conf
-                               .getMapOutputKeyClass().getName(), conf
-                               .getMapOutputValueClass().getName());
-               Class<? extends RawComparator> rawComparatorClass = null;
-               WritableComparator writableComparator = 
WritableComparator.get(conf
-                               
.getMapOutputKeyClass().asSubclass(WritableComparable.class));
-               WritableComparingBinaryComparatorFactory comparatorFactory = 
new WritableComparingBinaryComparatorFactory(
-                               writableComparator.getClass());
-               externalSortOp = new ExternalSortOperatorDescriptor(spec, 
conf.getInt(
-                               HYRACKS_EX_SORT_FRAME_LIMIT, 
DEFAULT_EX_SORT_FRAME_LIMIT),
-                               new int[] { 0 },
-                               new IBinaryComparatorFactory[] { 
comparatorFactory },
-                               recordDescriptor);
-               return externalSortOp;
-       }
-
-       public static MToNPartitioningConnectorDescriptor 
getMtoNHashPartitioningConnector(
-                       JobConf conf, IConnectorDescriptorRegistry spec) {
-
-               Class mapOutputKeyClass = conf.getMapOutputKeyClass();
-               Class mapOutputValueClass = conf.getMapOutputValueClass();
-
-               MToNPartitioningConnectorDescriptor connectorDescriptor = null;
-               ITuplePartitionComputerFactory factory = null;
-               conf.getMapOutputKeyClass();
-               if (conf.getPartitionerClass() != null
-                               && 
!conf.getPartitionerClass().getName().startsWith(
-                                               "org.apache.hadoop")) {
-                       Class<? extends Partitioner> partitioner = conf
-                                       .getPartitionerClass();
-                       factory = new 
HadoopPartitionerTuplePartitionComputerFactory(
-                                       partitioner, DatatypeHelper
-                                                       
.createSerializerDeserializer(mapOutputKeyClass),
-                                       DatatypeHelper
-                                                       
.createSerializerDeserializer(mapOutputValueClass));
-               } else {
-                       RecordDescriptor recordDescriptor = DatatypeHelper
-                                       
.createKeyValueRecordDescriptor(mapOutputKeyClass,
-                                                       mapOutputValueClass);
-                       ISerializerDeserializer 
mapOutputKeySerializerDerserializer = DatatypeHelper
-                                       
.createSerializerDeserializer(mapOutputKeyClass);
-                       factory = new HadoopHashTuplePartitionComputerFactory(
-                                       mapOutputKeySerializerDerserializer);
-               }
-               connectorDescriptor = new 
MToNPartitioningConnectorDescriptor(spec,
-                               factory);
-               return connectorDescriptor;
-       }
-
-}
diff --git 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java
 
b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java
deleted file mode 100644
index 8f326b8..0000000
--- 
a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hadoop.compat.util;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
-
-public class Utilities {
-
-    public static Properties getProperties(String filePath, char delimiter) {
-        Properties properties = new Properties();
-        try {
-            FileInputStream fins = new FileInputStream(new File(filePath));
-            DataInputStream dins = new DataInputStream(fins);
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(dins));
-            String strLine;
-            while ((strLine = br.readLine()) != null) {
-                int split = strLine.indexOf(delimiter);
-                if (split >= 0) {
-                    properties.put((strLine.substring(0, split)).trim(), 
strLine.substring(split + 1, strLine.length())
-                            .trim());
-                }
-            }
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-        }
-        return properties;
-    }
-
-    public static File getHyracksArchive(String applicationName, Set<String> 
libJars) {
-       String target = applicationName + ".zip";
-        // Create a buffer for reading the files
-        byte[] buf = new byte[1024];
-        Set<String> fileNames = new HashSet<String>();
-        try {
-            ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(target));
-            for (String libJar : libJars) {
-                String fileName = libJar.substring(libJar.lastIndexOf("/") + 
1);
-                if(fileNames.contains(fileName)){
-                    continue;
-                }
-                FileInputStream in = new FileInputStream(libJar);
-                out.putNextEntry(new ZipEntry(fileName));
-                int len;
-                while ((len = in.read(buf)) > 0) {
-                    out.write(buf, 0, len);
-                }
-                out.closeEntry();
-                in.close();
-                fileNames.add(fileName);
-            }
-            out.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        File har = new File(target);
-        har.deleteOnExit();
-        return har;
-    }
-    
-    public static Reporter createReporter() {
-        Reporter reporter = new Reporter() {
-
-            @Override
-            public void progress() {
-
-            }
-
-            @Override
-            public void setStatus(String arg0) {
-
-            }
-
-            @Override
-            public void incrCounter(String arg0, String arg1, long arg2) {
-
-            }
-
-            @Override
-            public void incrCounter(Enum<?> arg0, long arg1) {
-
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws 
UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String arg0, String arg1) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(Enum<?> arg0) {
-                return null;
-            }
-
-            @Override
-            public float getProgress() {
-                // TODO Auto-generated method stub
-                return 0f;
-            }
-        };
-        return reporter;
-    }
-}
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index 1e9b523..aa1ae28 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -92,7 +92,7 @@
     <module>hyracks-server</module>
     <module>hyracks-examples</module>
     <module>hyracks-documentation</module>
-    <module>hyracks-hadoop-compat</module>
+    <!--module>hyracks-hadoop-compat</module-->
     <module>hyracks-maven-plugins</module>
     <module>hyracks-hdfs</module>
     <module>hyracks-dist</module>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/387
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I98b18f24a20dcd8dc75e828e47fb0ab98179a5bf
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Ian Maxon <[email protected]>

Reply via email to