Ian Maxon has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/387
Change subject: Remove old Hadoop compat layer ...................................................................... Remove old Hadoop compat layer This module is: a) not functional b) dependent on edu.uci.ics.dcache, which is obsolete Hence, for now, we should remove it from the source tree. Change-Id: I98b18f24a20dcd8dc75e828e47fb0ab98179a5bf --- D hyracks/hyracks-hadoop-compat/pom.xml D hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java D hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java M hyracks/pom.xml 11 files changed, 1 insertion(+), 1,402 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/87/387/1 diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml deleted file mode 100644 index 2e15200..0000000 --- a/hyracks/hyracks-hadoop-compat/pom.xml +++ /dev/null @@ -1,116 +0,0 @@ -<!-- - ! Licensed to the Apache Software Foundation (ASF) under one - ! or more contributor license agreements. See the NOTICE file - ! distributed with this work for additional information - ! regarding copyright ownership. The ASF licenses this file - ! to you under the Apache License, Version 2.0 (the - ! "License"); you may not use this file except in compliance - ! with the License. You may obtain a copy of the License at - ! - ! http://www.apache.org/licenses/LICENSE-2.0 - ! - ! Unless required by applicable law or agreed to in writing, - ! software distributed under the License is distributed on an - ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ! KIND, either express or implied. See the License for the - ! specific language governing permissions and limitations - ! under the License. - !--> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <artifactId>hyracks-hadoop-compat</artifactId> - <name>hyracks-hadoop-compat</name> - - <parent> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks</artifactId> - <version>0.2.16-SNAPSHOT</version> - </parent> - - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - <comments>A business-friendly OSS license</comments> - </license> - </licenses> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.0.2</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - <fork>true</fork> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>appassembler-maven-plugin</artifactId> - <version>1.3</version> - <executions> - <execution> - <configuration> - <programs> - <program> - <mainClass>org.apache.hyracks.hadoop.compat.driver.CompatibilityLayer</mainClass> - <name>hadoop-compat</name> - </program> - </programs> - <repositoryLayout>flat</repositoryLayout> - <repositoryName>lib</repositoryName> - </configuration> - <phase>package</phase> - <goals> - <goal>assemble</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.2-beta-5</version> - <executions> - <execution> - <configuration> - <descriptors> - <descriptor>src/main/assembly/binary-assembly.xml</descriptor> - </descriptors> - </configuration> - <phase>package</phase> - <goals> - <goal>attached</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <type>jar</type> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>edu.uci.ics.dcache</groupId> - <artifactId>dcache-client</artifactId> - <version>0.0.1</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-dataflow-hadoop</artifactId> - <version>0.2.16-SNAPSHOT</version> - <type>jar</type> - <scope>compile</scope> - </dependency> - </dependencies> -</project> diff --git a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml deleted file mode 100644 index d4a2ea1..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml +++ /dev/null @@ -1,38 +0,0 @@ -<!-- - ! Licensed to the Apache Software Foundation (ASF) under one - ! or more contributor license agreements. See the NOTICE file - ! distributed with this work for additional information - ! regarding copyright ownership. The ASF licenses this file - ! to you under the Apache License, Version 2.0 (the - ! "License"); you may not use this file except in compliance - ! with the License. You may obtain a copy of the License at - ! - ! http://www.apache.org/licenses/LICENSE-2.0 - ! - ! Unless required by applicable law or agreed to in writing, - ! software distributed under the License is distributed on an - ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ! KIND, either express or implied. See the License for the - ! specific language governing permissions and limitations - ! under the License. - !--> - -<assembly> - <id>binary-assembly</id> - <formats> - <format>zip</format> - <format>dir</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>target/appassembler/bin</directory> - <outputDirectory>bin</outputDirectory> - <fileMode>0755</fileMode> - </fileSet> - <fileSet> - <directory>target/appassembler/lib</directory> - <outputDirectory>lib</outputDirectory> - </fileSet> - </fileSets> -</assembly> diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java deleted file mode 100644 index d0ef2a9..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksClient.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.client; - -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.api.job.JobFlag; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.api.job.JobStatus; -import org.apache.hyracks.hadoop.compat.util.ConfigurationConstants; -import org.apache.hyracks.hadoop.compat.util.Utilities; - -public class HyracksClient { - - private static HyracksConnection connection; - private static final String jobProfilingKey = "jobProfilingKey"; - Set<String> systemLibs; - - public HyracksClient(Properties clusterProperties) throws Exception { - initialize(clusterProperties); - } - - private void initialize(Properties properties) throws Exception { - String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost); - connection = new HyracksConnection(clusterController, 1098); - systemLibs = new HashSet<String>(); - for (String systemLib : ConfigurationConstants.systemLibs) { - String systemLibPath = properties.getProperty(systemLib); - if (systemLibPath != null) { - systemLibs.add(systemLibPath); - } - } - } - - public HyracksClient(String clusterConf, char delimiter) throws Exception { - Properties properties = Utilities.getProperties(clusterConf, delimiter); - initialize(properties); - } - - public JobStatus getJobStatus(JobId jobId) throws Exception { - return connection.getJobStatus(jobId); - } - - public HyracksRunningJob submitJob(String applicationName, JobSpecification spec) throws Exception { - String jobProfilingVal = System.getenv(jobProfilingKey); - boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal)); - JobId jobId; - if (doProfiling) { - System.out.println("PROFILING"); - jobId = connection.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - } else { - jobId = connection.startJob(spec); - } - HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this); - return runningJob; - } - - public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> userLibs) - throws Exception { - return submitJob(applicationName, spec); - } - - public void waitForCompleton(JobId jobId) throws Exception { - connection.waitForCompletion(jobId); - } -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java deleted file mode 100644 index 921de2d..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/client/HyracksRunningJob.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.client; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskCompletionEvent; - -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.api.job.JobId; - -public class HyracksRunningJob implements RunningJob { - - JobId jobId; - IOperatorDescriptorRegistry spec; - HyracksClient hyracksClient; - - public JobId getJobId() { - return jobId; - } - - public void setJobId(JobId jobId) { - this.jobId = jobId; - } - - public IOperatorDescriptorRegistry getSpec() { - return spec; - } - - public void setSpec(IOperatorDescriptorRegistry spec) { - this.spec = spec; - } - - public HyracksRunningJob(JobId jobId, IOperatorDescriptorRegistry jobSpec, HyracksClient hyracksClient) { - this.spec = jobSpec; - this.jobId = jobId; - this.hyracksClient = hyracksClient; - } - - @Override - public float cleanupProgress() throws IOException { - return 0; - } - - @Override - public Counters getCounters() throws IOException { - return new Counters(); - } - - @Override - public JobID getID() { - return new JobID(this.jobId.toString(), 1); - } - - @Override - public String getJobFile() { - return ""; - } - - @Override - public String getJobID() { - return this.jobId.toString(); - } - - @Override - public String getJobName() { - return this.jobId.toString(); - } - - @Override - public int getJobState() throws IOException { - return isComplete() ? 2 : 1; - } - - @Override - public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws IOException { - return new TaskCompletionEvent[0]; - } - - @Override - public String getTrackingURL() { - return " running on hyrax, remote kill is not supported "; - } - - @Override - public boolean isComplete() throws IOException { - org.apache.hyracks.api.job.JobStatus status = null; - try { - status = hyracksClient.getJobStatus(jobId); - } catch (Exception e) { - e.printStackTrace(); - } - - return status.equals(org.apache.hyracks.api.job.JobStatus.TERMINATED); - } - - @Override - public boolean isSuccessful() throws IOException { - return isComplete(); - } - - @Override - public void killJob() throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void killTask(String taskId, boolean shouldFail) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public float mapProgress() throws IOException { - return 0; - } - - @Override - public float reduceProgress() throws IOException { - return 0; - } - - @Override - public void setJobPriority(String priority) throws IOException { - - } - - @Override - public float setupProgress() throws IOException { - return 0; - } - - @Override - public void waitForCompletion() throws IOException { - while (!isComplete()) { - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { - } - } - } - - @Override - public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void killTask(TaskAttemptID arg0, boolean arg1) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public Configuration getConfiguration() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getFailureInfo() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getHistoryUrl() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public JobStatus getJobStatus() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean isRetired() throws IOException { - // TODO Auto-generated method stub - return false; - } - -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java deleted file mode 100644 index 87b0127..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/driver/CompatibilityLayer.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.driver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.kohsuke.args4j.CmdLineParser; - -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.hadoop.compat.client.HyracksClient; -import org.apache.hyracks.hadoop.compat.client.HyracksRunningJob; -import org.apache.hyracks.hadoop.compat.util.CompatibilityConfig; -import org.apache.hyracks.hadoop.compat.util.ConfigurationConstants; -import org.apache.hyracks.hadoop.compat.util.DCacheHandler; -import org.apache.hyracks.hadoop.compat.util.HadoopAdapter; -import org.apache.hyracks.hadoop.compat.util.Utilities; - -public class CompatibilityLayer { - - HyracksClient hyracksClient; - DCacheHandler dCacheHander = null; - Properties clusterConf; - HadoopAdapter hadoopAdapter; - - private static char configurationFileDelimiter = '='; - private static final String dacheKeyPrefix = "dcache.key"; - - public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception { - initialize(clConfig); - } - - private void initialize(CompatibilityConfig clConfig) throws Exception { - clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter); - hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL)); - hyracksClient = new HyracksClient(clusterConf); - dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration)); - } - - public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception { - List<JobConf> jobConfs = new ArrayList<JobConf>(); - jobConfs.add(conf); - String applicationName = conf.getJobName() + System.currentTimeMillis(); - JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs); - HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs); - return hyracksRunningJob; - } - - public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs) - throws Exception { - List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles); - populateDCache(jobFiles[0]); - JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs); - HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs); - return hyracksRunningJob; - } - - public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception { - List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles); - populateDCache(jobFiles[0]); - JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs); - HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec); - return hyracksRunningJob; - } - - private void populateDCache(String jobFile) throws IOException { - Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile); - String tempDir = "/tmp"; - if (dcacheTasks.size() > 0) { - for (String key : dcacheTasks.keySet()) { - String destPath = tempDir + "/" + key + System.currentTimeMillis(); - hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath)); - System.out.println(" source :" + dcacheTasks.get(key)); - System.out.println(" dest :" + destPath); - System.out.println(" key :" + key); - System.out.println(" value :" + destPath); - dCacheHander.put(key, destPath); - } - } - } - - private String getApplicationNameForHadoopJob(JobConf jobConf) { - String jar = jobConf.getJar(); - if (jar != null) { - return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0); - } else { - return "" + System.currentTimeMillis(); - } - } - - private Map<String, String> initializeCustomProperties(Properties properties, String prefix) { - Map<String, String> foundProperties = new HashMap<String, String>(); - Set<Entry<Object, Object>> entrySet = properties.entrySet(); - for (Entry entry : entrySet) { - String key = (String) entry.getKey(); - String value = (String) entry.getValue(); - if ((key.startsWith(prefix))) { - String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning" - foundProperties.put(actualKey, value); - } - } - return foundProperties; - } - - public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) { - Properties jobProperties = Utilities.getProperties(jobFile, ','); - Map<String, String> dcacheTasks = new HashMap<String, String>(); - Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix); - for (String key : dcacheKeys.keySet()) { - String sourcePath = dcacheKeys.get(key); - if (sourcePath != null) { - dcacheTasks.put(key, sourcePath); - } - } - return dcacheTasks; - } - - public void waitForCompletion(JobId jobId) throws Exception { - hyracksClient.waitForCompleton(jobId); - } - - private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception { - List<JobConf> jobConfs = new ArrayList<JobConf>(); - for (String jobFile : jobFiles) { - jobConfs.add(constructHadoopJobConf(jobFile)); - } - return jobConfs; - } - - private JobConf constructHadoopJobConf(String jobFile) { - Properties jobProperties = Utilities.getProperties(jobFile, '='); - JobConf conf = new JobConf(hadoopAdapter.getConf()); - for (Entry entry : jobProperties.entrySet()) { - conf.set((String) entry.getKey(), (String) entry.getValue()); - System.out.println((String) entry.getKey() + " : " + (String) entry.getValue()); - } - return conf; - } - - private String[] getJobs(CompatibilityConfig clConfig) { - return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(","); - } - - public static void main(String args[]) throws Exception { - long startTime = System.nanoTime(); - CompatibilityConfig clConfig = new CompatibilityConfig(); - CmdLineParser cp = new CmdLineParser(clConfig); - try { - cp.parseArgument(args); - } catch (Exception e) { - System.err.println(e.getMessage()); - cp.printUsage(System.err); - return; - } - CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig); - String applicationName = clConfig.applicationName; - String[] jobFiles = compatLayer.getJobs(clConfig); - String[] userLibraries = null; - if (clConfig.userLibs != null) { - userLibraries = clConfig.userLibs.split(","); - } - try { - HyracksRunningJob hyraxRunningJob = null; - if (userLibraries != null) { - Set<String> userLibs = new HashSet<String>(); - for (String userLib : userLibraries) { - userLibs.add(userLib); - } - hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs); - } else { - hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles); - } - compatLayer.waitForCompletion(hyraxRunningJob.getJobId()); - long end_time = System.nanoTime(); - System.out.println("TOTAL TIME (from Launch to Completion):" - + ((end_time - startTime) / (float) 1000000000.0) + " seconds."); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java deleted file mode 100644 index fb4b607..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/CompatibilityConfig.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.util; - -import org.kohsuke.args4j.Option; - -public class CompatibilityConfig { - - @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +" - + " (1) Address of HyracksClusterController service" - + " (2) Address of Hadoop namenode service") - public String clusterConf; - - @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. " - + "Each job file defines the hadoop job + " - + "The order in the list defines the sequence in which" - + "the jobs are to be executed") - public String jobFiles; - - @Option(name = "-applicationName", usage = " The application as part of which the job executes") - public String applicationName; - - @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running ") - public String userLibs; -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java deleted file mode 100644 index 88cef77..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/ConfigurationConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.util; - -public class ConfigurationConstants { - - public static final String clusterControllerHost = "clusterControllerHost"; - public static final String namenodeURL = "fs.default.name"; - public static final String dcacheServerConfiguration = "dcacheServerConfiguration"; - public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib", - "hyracksDataflowHadoopLib", "hadoopCoreLib"}; - -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java deleted file mode 100644 index 39a43fc..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/DCacheHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.util; - -import java.io.IOException; -import java.util.LinkedHashSet; -import java.util.Set; - -import edu.uci.ics.dcache.client.DCacheClient; -import edu.uci.ics.dcache.client.DCacheClientConfig; - -public class DCacheHandler { - - String clientPropertyFile; - String key; - String valueFilePath; - - public enum Operation { - GET, - PUT, - DELETE - } - - private static DCacheClient dCacheClient; - private static final String[] operations = { "GET", "PUT", "DELETE" }; - - public DCacheHandler(String clientPropertyFile) throws Exception { - this.clientPropertyFile = clientPropertyFile; - init(); - } - - private void init() throws Exception { - dCacheClient = DCacheClient.get(); - DCacheClientConfig dcacheClientConfig = new DCacheClientConfig(); - dCacheClient.init(dcacheClientConfig); - } - - public static DCacheClient getInstance(String clientPropertyFile) { - if (dCacheClient == null) { - dCacheClient = DCacheClient.get(); - } - return dCacheClient; - } - - public String getClientPropertyFile() { - return clientPropertyFile; - } - - public void put(String key, String value) throws IOException { - dCacheClient.set(key, value); - System.out.println(" added to cache " + key + " : " + value); - } - - public Object get(String key) throws IOException { - return dCacheClient.get(key); - } - - public void delete(String key) throws IOException { - dCacheClient.delete(key); - } - - public Object performOperation(String operation, String[] args) throws Exception { - Object returnValue = null; - int operationIndex = getOperation(operation); - switch (operationIndex) { - case 0: - returnValue = dCacheClient.get(args[2]); - System.out.println(" get from cache " + returnValue); - break; - case 1: - dCacheClient.set(args[2], args[3]); - System.out.println(" added to cache " + args[2] + " : " + args[3]); - break; - case 2: - dCacheClient.delete(args[2]); - System.out.println(" removed from cache " + args[2]); - break; - default: - System.out.println("Error : Operation not supported !"); - break; - } - return returnValue; - } - - private int getOperation(String operation) { - for (int i = 0; i < operations.length; i++) { - if (operations[i].equalsIgnoreCase(operation)) { - return i; - } - } - return -1; - } - -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java deleted file mode 100644 index d2f9c19..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/HadoopAdapter.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.util; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobContextImpl; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.dataflow.IConnectorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor; -import org.apache.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor; -import org.apache.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor; -import org.apache.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory; -import org.apache.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory; -import org.apache.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory; -import org.apache.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory; -import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor; - -public class HadoopAdapter { - - public static final String FS_DEFAULT_NAME = "fs.default.name"; - private JobConf jobConf; - private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>(); - public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; - public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096; - public static final int DEFAULT_MAX_MAPPERS = 40; - public static final int DEFAULT_MAX_REDUCERS = 40; - public static final String MAX_MAPPERS_KEY = "maxMappers"; - public static final String MAX_REDUCERS_KEY = "maxReducers"; - public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit"; - - private int maxMappers = DEFAULT_MAX_MAPPERS; - private int maxReducers = DEFAULT_MAX_REDUCERS; - private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT; - - class NewHadoopConstants { - public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class"; - public static final String MAP_CLASS_ATTR = "mapreduce.map.class"; - public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class"; - public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class"; - public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class"; - public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class"; - } - - public HadoopAdapter(String namenodeUrl) { - jobConf = new JobConf(true); - jobConf.set(FS_DEFAULT_NAME, namenodeUrl); - if (System.getenv(MAX_MAPPERS_KEY) != null) { - maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY)); - } - if (System.getenv(MAX_REDUCERS_KEY) != null) { - maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY)); - } - if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) { - exSortFrame = Integer.parseInt(System - .getenv(EX_SORT_FRAME_LIMIT_KEY)); - } - } - - private String getEnvironmentVariable(String key, String def) { - String ret = System.getenv(key); - return ret != null ? ret : def; - } - - public JobConf getConf() { - return jobConf; - } - //TODO: Why is there now a type mismatch? Why does a bounded wildcard fix it? - public static VersionedProtocol getProtocol(Class<? extends VersionedProtocol> protocolClass, - InetSocketAddress inetAddress, JobConf jobConf) throws IOException { - VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, - ClientProtocol.versionID, inetAddress, jobConf); - return versionedProtocol; - } - - private static RecordDescriptor getHadoopRecordDescriptor( - String className1, String className2) { - RecordDescriptor recordDescriptor = null; - try { - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class<? extends Writable>) Class.forName(className1), - (Class<? extends Writable>) Class.forName(className2)); - } catch (ClassNotFoundException cnfe) { - cnfe.printStackTrace(); - } - return recordDescriptor; - } - - private Object[] getInputSplits(JobConf conf) throws IOException, - ClassNotFoundException, InterruptedException { - if (conf.getUseNewMapper()) { - return getNewInputSplits(conf); - } else { - return getOldInputSplits(conf); - } - } - - private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits( - JobConf conf) throws ClassNotFoundException, IOException, - InterruptedException { - org.apache.hadoop.mapreduce.InputSplit[] splits = null; - JobContext context = new JobContextImpl(conf, null); - org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils - .newInstance(context.getInputFormatClass(), conf); - List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat - .getSplits(context); - return inputSplits - .toArray(new org.apache.hadoop.mapreduce.InputSplit[] {}); - } - - private InputSplit[] getOldInputSplits(JobConf conf) throws IOException { - InputFormat inputFormat = conf.getInputFormat(); - return inputFormat.getSplits(conf, conf.getNumMapTasks()); - } - - private void configurePartitionCountConstraint(JobSpecification spec, - IOperatorDescriptor operator, int instanceCount) { - PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, - instanceCount); - operatorInstanceCount.put(operator.getOperatorId(), instanceCount); - } - - public HadoopMapperOperatorDescriptor getMapper(JobConf conf, - JobSpecification spec, IOperatorDescriptor previousOp) - throws Exception { - boolean selfRead = previousOp == null; - IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory(); - HadoopMapperOperatorDescriptor mapOp = null; - if (selfRead) { - Object[] splits = getInputSplits(conf, maxMappers); - mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits, - classFactory); - configurePartitionCountConstraint(spec, mapOp, splits.length); - } else { - configurePartitionCountConstraint(spec, mapOp, - getInstanceCount(previousOp)); - mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory); - spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, - mapOp, 0); - } - return mapOp; - } - - public HadoopReducerOperatorDescriptor getReducer(JobConf conf, - IOperatorDescriptorRegistry spec, boolean useAsCombiner) { - HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor( - spec, conf, null, new ClasspathBasedHadoopClassFactory(), - useAsCombiner); - return reduceOp; - } - - public FileSystem getHDFSClient() { - FileSystem fileSystem = null; - try { - fileSystem = FileSystem.get(jobConf); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - return fileSystem; - } - - public JobSpecification getJobSpecification(List<JobConf> jobConfs) - throws Exception { - JobSpecification spec = null; - if (jobConfs.size() == 1) { - spec = getJobSpecification(jobConfs.get(0)); - } else { - spec = getPipelinedSpec(jobConfs); - } - return spec; - } - - private IOperatorDescriptor configureOutput( - IOperatorDescriptor previousOperator, JobConf conf, - JobSpecification spec) throws Exception { - int instanceCountPreviousOperator = operatorInstanceCount - .get(previousOperator.getOperatorId()); - int numOutputters = conf.getNumReduceTasks() != 0 ? conf - .getNumReduceTasks() : instanceCountPreviousOperator; - HadoopWriteOperatorDescriptor writer = null; - writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters); - configurePartitionCountConstraint(spec, writer, numOutputters); - spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, - 0, writer, 0); - return writer; - } - - private int getInstanceCount(IOperatorDescriptor operator) { - return operatorInstanceCount.get(operator.getOperatorId()); - } - - private IOperatorDescriptor addCombiner( - IOperatorDescriptor previousOperator, JobConf jobConf, - JobSpecification spec) throws Exception { - boolean useCombiner = (jobConf.getCombinerClass() != null); - IOperatorDescriptor mapSideOutputOp = previousOperator; - if (useCombiner) { - System.out.println("Using Combiner:" - + jobConf.getCombinerClass().getName()); - IOperatorDescriptor mapSideCombineSortOp = getExternalSorter( - jobConf, spec); - configurePartitionCountConstraint(spec, mapSideCombineSortOp, - getInstanceCount(previousOperator)); - - HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer( - jobConf, spec, true); - configurePartitionCountConstraint(spec, mapSideCombineReduceOp, - getInstanceCount(previousOperator)); - spec.connect(new OneToOneConnectorDescriptor(spec), - previousOperator, 0, mapSideCombineSortOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), - mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0); - mapSideOutputOp = mapSideCombineReduceOp; - } - return mapSideOutputOp; - } - - private int getNumReduceTasks(JobConf jobConf) { - int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks()); - return numReduceTasks; - } - - private IOperatorDescriptor addReducer( - IOperatorDescriptor previousOperator, JobConf jobConf, - JobSpecification spec) throws Exception { - IOperatorDescriptor mrOutputOperator = previousOperator; - if (jobConf.getNumReduceTasks() != 0) { - IOperatorDescriptor sorter = getExternalSorter(jobConf, spec); - HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec, - false); - int numReduceTasks = getNumReduceTasks(jobConf); - configurePartitionCountConstraint(spec, sorter, numReduceTasks); - configurePartitionCountConstraint(spec, reducer, numReduceTasks); - - IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector( - jobConf, spec); - spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, - 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, - reducer, 0); - mrOutputOperator = reducer; - } - return mrOutputOperator; - } - - private long getInputSize(Object[] splits, JobConf conf) - throws IOException, InterruptedException { - long totalInputSize = 0; - if (conf.getUseNewMapper()) { - for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) { - totalInputSize += split.getLength(); - } - } else { - for (InputSplit split : (InputSplit[]) splits) { - totalInputSize += split.getLength(); - } - } - return totalInputSize; - } - - private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) - throws Exception { - Object[] splits = getInputSplits(conf); - if (splits.length > desiredMaxMappers) { - long totalInputSize = getInputSize(splits, conf); - long goalSize = (totalInputSize / desiredMaxMappers); - conf.setLong("mapred.min.split.size", goalSize); - conf.setNumMapTasks(desiredMaxMappers); - splits = getInputSplits(conf); - } - return splits; - } - - public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) - throws Exception { - JobSpecification spec = new JobSpecification(); - Iterator<JobConf> iterator = jobConfs.iterator(); - JobConf firstMR = iterator.next(); - IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR); - while (iterator.hasNext()) - for (JobConf currentJobConf : jobConfs) { - mrOutputOp = configureMapReduce(mrOutputOp, spec, - currentJobConf); - } - configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec); - return spec; - } - - public JobSpecification getJobSpecification(JobConf conf) throws Exception { - JobSpecification spec = new JobSpecification(); - IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf); - IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec); - spec.addRoot(printer); - System.out.println(spec); - return spec; - } - - private IOperatorDescriptor configureMapReduce( - IOperatorDescriptor previousOuputOp, JobSpecification spec, - JobConf conf) throws Exception { - IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp); - IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec); - IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec); - return reducer; - } - - public static InMemorySortOperatorDescriptor getInMemorySorter( - JobConf conf, IOperatorDescriptorRegistry spec) { - InMemorySortOperatorDescriptor inMemorySortOp = null; - RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf - .getMapOutputKeyClass().getName(), conf - .getMapOutputValueClass().getName()); - Class<? extends RawComparator> rawComparatorClass = null; - WritableComparator writableComparator = WritableComparator.get(conf - .getMapOutputKeyClass().asSubclass(WritableComparable.class)); - WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory( - writableComparator.getClass()); - inMemorySortOp = new InMemorySortOperatorDescriptor(spec, - new int[] { 0 }, - new IBinaryComparatorFactory[] { comparatorFactory }, - recordDescriptor); - return inMemorySortOp; - } - - public static ExternalSortOperatorDescriptor getExternalSorter( - JobConf conf, IOperatorDescriptorRegistry spec) { - ExternalSortOperatorDescriptor externalSortOp = null; - RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf - .getMapOutputKeyClass().getName(), conf - .getMapOutputValueClass().getName()); - Class<? extends RawComparator> rawComparatorClass = null; - WritableComparator writableComparator = WritableComparator.get(conf - .getMapOutputKeyClass().asSubclass(WritableComparable.class)); - WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory( - writableComparator.getClass()); - externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt( - HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT), - new int[] { 0 }, - new IBinaryComparatorFactory[] { comparatorFactory }, - recordDescriptor); - return externalSortOp; - } - - public static MToNPartitioningConnectorDescriptor getMtoNHashPartitioningConnector( - JobConf conf, IConnectorDescriptorRegistry spec) { - - Class mapOutputKeyClass = conf.getMapOutputKeyClass(); - Class mapOutputValueClass = conf.getMapOutputValueClass(); - - MToNPartitioningConnectorDescriptor connectorDescriptor = null; - ITuplePartitionComputerFactory factory = null; - conf.getMapOutputKeyClass(); - if (conf.getPartitionerClass() != null - && !conf.getPartitionerClass().getName().startsWith( - "org.apache.hadoop")) { - Class<? extends Partitioner> partitioner = conf - .getPartitionerClass(); - factory = new HadoopPartitionerTuplePartitionComputerFactory( - partitioner, DatatypeHelper - .createSerializerDeserializer(mapOutputKeyClass), - DatatypeHelper - .createSerializerDeserializer(mapOutputValueClass)); - } else { - RecordDescriptor recordDescriptor = DatatypeHelper - .createKeyValueRecordDescriptor(mapOutputKeyClass, - mapOutputValueClass); - ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper - .createSerializerDeserializer(mapOutputKeyClass); - factory = new HadoopHashTuplePartitionComputerFactory( - mapOutputKeySerializerDerserializer); - } - connectorDescriptor = new MToNPartitioningConnectorDescriptor(spec, - factory); - return connectorDescriptor; - } - -} diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java deleted file mode 100644 index 8f326b8..0000000 --- a/hyracks/hyracks-hadoop-compat/src/main/java/org/apache/hyracks/hadoop/compat/util/Utilities.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.hadoop.compat.util; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; - -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.Counters.Counter; - -public class Utilities { - - public static Properties getProperties(String filePath, char delimiter) { - Properties properties = new Properties(); - try { - FileInputStream fins = new FileInputStream(new File(filePath)); - DataInputStream dins = new DataInputStream(fins); - BufferedReader br = new BufferedReader(new InputStreamReader(dins)); - String strLine; - while ((strLine = br.readLine()) != null) { - int split = strLine.indexOf(delimiter); - if (split >= 0) { - properties.put((strLine.substring(0, split)).trim(), strLine.substring(split + 1, strLine.length()) - .trim()); - } - } - } catch (IOException ioe) { - ioe.printStackTrace(); - } - return properties; - } - - public static File getHyracksArchive(String applicationName, Set<String> libJars) { - String target = applicationName + ".zip"; - // Create a buffer for reading the files - byte[] buf = new byte[1024]; - Set<String> fileNames = new HashSet<String>(); - try { - ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target)); - for (String libJar : libJars) { - String fileName = libJar.substring(libJar.lastIndexOf("/") + 1); - if(fileNames.contains(fileName)){ - continue; - } - FileInputStream in = new FileInputStream(libJar); - out.putNextEntry(new ZipEntry(fileName)); - int len; - while ((len = in.read(buf)) > 0) { - out.write(buf, 0, len); - } - out.closeEntry(); - in.close(); - fileNames.add(fileName); - } - out.close(); - } catch (IOException e) { - e.printStackTrace(); - } - File har = new File(target); - har.deleteOnExit(); - return har; - } - - public static Reporter createReporter() { - Reporter reporter = new Reporter() { - - @Override - public void progress() { - - } - - @Override - public void setStatus(String arg0) { - - } - - @Override - public void incrCounter(String arg0, String arg1, long arg2) { - - } - - @Override - public void incrCounter(Enum<?> arg0, long arg1) { - - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - - @Override - public Counter getCounter(String arg0, String arg1) { - return null; - } - - @Override - public Counter getCounter(Enum<?> arg0) { - return null; - } - - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0f; - } - }; - return reporter; - } -} diff --git a/hyracks/pom.xml b/hyracks/pom.xml index 1e9b523..aa1ae28 100644 --- a/hyracks/pom.xml +++ b/hyracks/pom.xml @@ -92,7 +92,7 @@ <module>hyracks-server</module> <module>hyracks-examples</module> <module>hyracks-documentation</module> - <module>hyracks-hadoop-compat</module> + <!--module>hyracks-hadoop-compat</module--> <module>hyracks-maven-plugins</module> <module>hyracks-hdfs</module> <module>hyracks-dist</module> -- To view, visit https://asterix-gerrit.ics.uci.edu/387 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I98b18f24a20dcd8dc75e828e47fb0ab98179a5bf Gerrit-PatchSet: 1 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Ian Maxon <[email protected]>
