http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java new file mode 100644 index 0000000..b9c20c3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.io.IOException; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Hadoop processor. + */ +public class HadoopProcessor extends HadoopProcessorAdapter { + /** Job ID counter. */ + private final AtomicInteger idCtr = new AtomicInteger(); + + /** Hadoop context. */ + @GridToStringExclude + private HadoopContext hctx; + + /** Hadoop facade for public API. */ + @GridToStringExclude + private Hadoop hadoop; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public HadoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.isDaemon()) + return; + + HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); + + if (cfg == null) + cfg = new HadoopConfiguration(); + else + cfg = new HadoopConfiguration(cfg); + + initializeDefaults(cfg); + + hctx = new HadoopContext( + ctx, + cfg, + new HadoopJobTracker(), + new HadoopEmbeddedTaskExecutor(), + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), + new HadoopShuffle()); + + for (HadoopComponent c : hctx.components()) + c.start(hctx); + + hadoop = new HadoopImpl(this); + + ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg)); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (hctx == null) + return; + + for (HadoopComponent c : hctx.components()) + c.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (hctx == null) + return; + + List<HadoopComponent> components = hctx.components(); + + for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.onKernalStop(cancel); + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + if (hctx == null) + return; + + List<HadoopComponent> components = hctx.components(); + + for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.stop(cancel); + } + } + + /** + * Gets Hadoop context. + * + * @return Hadoop context. + */ + public HadoopContext context() { + return hctx; + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + if (hadoop == null) + throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + + "is HADOOP_HOME environment variable set?)"); + + return hadoop; + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration config() { + return hctx.configuration(); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + return hctx.jobTracker().submit(jobId, jobInfo); + } + + /** {@inheritDoc} */ + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().status(jobId); + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().jobCounters(jobId); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().finishFuture(jobId); + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().killJob(jobId); + } + + /** {@inheritDoc} */ + @Override public void validateEnvironment() throws IgniteCheckedException { + // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here. + try { + HadoopLocations loc = HadoopClasspathUtils.locations(); + + if (!F.isEmpty(loc.home())) + U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home()); + + U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " + + loc.mapred()); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe.getMessage(), ioe); + } + + HadoopClassLoader.hadoopUrls(); + } + + /** + * Initializes default hadoop configuration. + * + * @param cfg Hadoop configuration. + */ + private void initializeDefaults(HadoopConfiguration cfg) { + if (cfg.getMapReducePlanner() == null) + cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessor.class, this); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java new file mode 100644 index 0000000..ed39ce5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.Scanner; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; +import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; + +/** + * Setup tool to configure Hadoop client. + */ +public class HadoopSetup { + /** */ + public static final String WINUTILS_EXE = "winutils.exe"; + + /** */ + private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("ignite-") && name.endsWith(".jar"); + } + }; + + /** + * The main method. + * @param ignore Params. + */ + public static void main(String[] ignore) { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (7 7 // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " for Apache Hadoop ", + " ", + "ver. " + ACK_VER_STR, + COPYRIGHT); + + configureHadoop(); + } + + /** + * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. + * It performs these operations: + * <ul> + * <li>Check for setting of HADOOP_HOME environment variable.</li> + * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li> + * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li> + * <li>In Windows check new line character issues in CMD scripts.</li> + * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li> + * </ul> + */ + private static void configureHadoop() { + String igniteHome = U.getIgniteHome(); + + println("IGNITE_HOME is set to '" + igniteHome + "'."); + + checkIgniteHome(igniteHome); + + String homeVar = "HADOOP_HOME"; + String hadoopHome = System.getenv(homeVar); + + if (F.isEmpty(hadoopHome)) { + homeVar = "HADOOP_PREFIX"; + hadoopHome = System.getenv(homeVar); + } + + if (F.isEmpty(hadoopHome)) + exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + + "valid Hadoop installation directory and run setup tool again.", null); + + hadoopHome = hadoopHome.replaceAll("\"", ""); + + println(homeVar + " is set to '" + hadoopHome + "'."); + + String hiveHome = System.getenv("HIVE_HOME"); + + if (!F.isEmpty(hiveHome)) { + hiveHome = hiveHome.replaceAll("\"", ""); + + println("HIVE_HOME is set to '" + hiveHome + "'."); + } + + File hadoopDir = new File(hadoopHome); + + if (!hadoopDir.exists()) + exit("Hadoop installation folder does not exist.", null); + + if (!hadoopDir.isDirectory()) + exit("HADOOP_HOME must point to a directory.", null); + + if (!hadoopDir.canRead()) + exit("Hadoop installation folder can not be read. Please check permissions.", null); + + final File hadoopCommonDir; + + String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); + + if (F.isEmpty(hadoopCommonHome)) { + hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); + + println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); + } + else { + println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); + + hadoopCommonDir = new File(hadoopCommonHome); + } + + if (!hadoopCommonDir.canRead()) + exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null); + + final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); + + if (!hadoopCommonLibDir.canRead()) + exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); + + if (U.isWindows()) { + checkJavaPathSpaces(); + + final File hadoopBinDir = new File(hadoopDir, "bin"); + + if (!hadoopBinDir.canRead()) + exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); + + File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); + + if (!winutilsFile.exists()) { + if (ask("File '" + WINUTILS_EXE + "' does not exist. " + + "It may be replaced by a stub. Create it?")) { + println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); + + boolean ok = false; + + try { + ok = winutilsFile.createNewFile(); + } + catch (IOException ignore) { + // No-op. + } + + if (!ok) + exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); + } + else + println("Ok. But Hadoop client probably will not work on Windows this way..."); + } + + processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); + } + + File igniteLibs = new File(new File(igniteHome), "libs"); + + if (!igniteLibs.exists()) + exit("Ignite 'libs' folder is not found.", null); + + Collection<File> jarFiles = new ArrayList<>(); + + addJarsInFolder(jarFiles, igniteLibs); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl")); + + boolean jarsLinksCorrect = true; + + for (File file : jarFiles) { + File link = new File(hadoopCommonLibDir, file.getName()); + + jarsLinksCorrect &= isJarLinkCorrect(link, file); + + if (!jarsLinksCorrect) + break; + } + + if (!jarsLinksCorrect) { + if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + + "Create appropriate symbolic links?")) { + File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); + + if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + + "installation. They must be deleted to continue. Continue?")) { + for (File file : oldIgniteJarFiles) { + println("Deleting file '" + file.getAbsolutePath() + "'."); + + if (!file.delete()) + exit("Failed to delete file '" + file.getPath() + "'.", null); + } + } + + for (File file : jarFiles) { + File targetFile = new File(hadoopCommonLibDir, file.getName()); + + try { + println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); + + Files.createSymbolicLink(targetFile.toPath(), file.toPath()); + } + catch (IOException e) { + if (U.isWindows()) { + warn("Ability to create symbolic links is required!"); + warn("On Windows platform you have to grant permission 'Create symbolic links'"); + warn("to your user or run the Accelerator as Administrator."); + } + + exit("Creating symbolic link failed! Check permissions.", e); + } + } + } + else + println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); + } + + File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); + + File igniteHadoopCfg = igniteHadoopConfig(igniteHome); + + if (!igniteHadoopCfg.canRead()) + exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null); + + if (hadoopEtc.canWrite()) { // TODO Bigtop + if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + + "(existing files will be backed up)?")) { + replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"), + new File(hadoopEtc, "core-site.xml")); + + replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"), + new File(hadoopEtc, "mapred-site.xml")); + } + else + println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); + } + + if (!F.isEmpty(hiveHome)) { + File hiveConfDir = new File(hiveHome + File.separator + "conf"); + + if (!hiveConfDir.canWrite()) + warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + + "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); + else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) + replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"), + new File(hiveConfDir, "hive-site.xml")); + else + println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); + } + + println("Apache Hadoop setup is complete."); + } + + /** + * Get Ignite Hadoop config directory. + * + * @param igniteHome Ignite home. + * @return Ignite Hadoop config directory. + */ + private static File igniteHadoopConfig(String igniteHome) { + Path path = Paths.get(igniteHome, "modules", "hadoop", "config"); + + if (!Files.exists(path)) + path = Paths.get(igniteHome, "config", "hadoop"); + + if (Files.exists(path)) + return path.toFile(); + else + return new File(igniteHome, "docs"); + } + + /** + * @param jarFiles Jars. + * @param folder Folder. + */ + private static void addJarsInFolder(Collection<File> jarFiles, File folder) { + if (!folder.exists()) + exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); + + jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); + } + + /** + * Checks that JAVA_HOME does not contain space characters. + */ + private static void checkJavaPathSpaces() { + String javaHome = System.getProperty("java.home"); + + if (javaHome.contains(" ")) { + warn("Java installation path contains space characters!"); + warn("Hadoop client will not be able to start using '" + javaHome + "'."); + warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); + } + } + + /** + * Checks Ignite home. + * + * @param igniteHome Ignite home. + */ + private static void checkIgniteHome(String igniteHome) { + URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); + + try { + Path jar = Paths.get(jarUrl.toURI()); + Path igHome = Paths.get(igniteHome); + + if (!jar.startsWith(igHome)) + exit("Ignite JAR files are not under IGNITE_HOME.", null); + } + catch (Exception e) { + exit(e.getMessage(), e); + } + } + + /** + * Replaces target file with source file. + * + * @param from From. + * @param to To. + */ + private static void replaceWithBackup(File from, File to) { + if (!from.canRead()) + exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); + + println("Replacing file '" + to.getAbsolutePath() + "'."); + + try { + U.copy(from, renameToBak(to), true); + } + catch (IOException e) { + exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); + } + } + + /** + * Renames file for backup. + * + * @param file File. + * @return File. + */ + private static File renameToBak(File file) { + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); + + if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) + exit("Failed to rename file '" + file.getPath() + "'.", null); + + return file; + } + + /** + * Checks if link is correct. + * + * @param link Symbolic link. + * @param correctTarget Correct link target. + * @return {@code true} If link target is correct. + */ + private static boolean isJarLinkCorrect(File link, File correctTarget) { + if (!Files.isSymbolicLink(link.toPath())) + return false; // It is a real file or it does not exist. + + Path target = null; + + try { + target = Files.readSymbolicLink(link.toPath()); + } + catch (IOException e) { + exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); + } + + return Files.exists(target) && target.toFile().equals(correctTarget); + } + + /** + * Writes the question end read the boolean answer from the console. + * + * @param question Question to write. + * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. + */ + private static boolean ask(String question) { + X.println(); + X.print(" < " + question + " (Y/N): "); + + String answer = null; + + if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) + answer = "Y"; + else { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + try { + answer = br.readLine(); + } + catch (IOException e) { + exit("Failed to read answer: " + e.getMessage(), e); + } + } + + if (answer != null && "Y".equals(answer.toUpperCase().trim())) { + X.println(" > Yes."); + + return true; + } + else { + X.println(" > No."); + + return false; + } + } + + /** + * Exit with message. + * + * @param msg Exit message. + */ + private static void exit(String msg, Exception e) { + X.println(" "); + X.println(" # " + msg); + X.println(" # Setup failed, exiting... "); + + if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) + e.printStackTrace(); + + System.exit(1); + } + + /** + * Prints message. + * + * @param msg Message. + */ + private static void println(String msg) { + X.println(" > " + msg); + } + + /** + * Prints warning. + * + * @param msg Message. + */ + private static void warn(String msg) { + X.println(" ! " + msg); + } + + /** + * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the + * answer. If it's 'Y' then backups original files and corrects invalid new line characters. + * + * @param rootDir Root directory to process. + * @param dirs Directories inside of the root to process. + */ + private static void processCmdFiles(File rootDir, String... dirs) { + boolean answer = false; + + for (String dir : dirs) { + File subDir = new File(rootDir, dir); + + File[] cmdFiles = subDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".cmd"); + } + }); + + for (File file : cmdFiles) { + String content = null; + + try (Scanner scanner = new Scanner(file)) { + content = scanner.useDelimiter("\\Z").next(); + } + catch (FileNotFoundException e) { + exit("Failed to read file '" + file + "'.", e); + } + + boolean invalid = false; + + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { + invalid = true; + + break; + } + } + + if (invalid) { + answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); + + if (!answer) { + println("Ok. But Windows most probably will fail to execute them..."); + + return; + } + + println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); + + renameToBak(file); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) + writer.write("\r"); + + writer.write(content.charAt(i)); + } + } + catch (IOException e) { + exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java new file mode 100644 index 0000000..1dc8674 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteException; + +/** + * Exception that throws when the task is cancelling. + */ +public class HadoopTaskCancelledException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Exception message. + */ + public HadoopTaskCancelledException(String msg) { + super(msg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java new file mode 100644 index 0000000..83ccdf0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop utility methods. + */ +public class HadoopUtils { + /** Property to store timestamp of new job id request. */ + public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; + + /** Property to store timestamp of response of new job id request. */ + public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; + + /** Property to store timestamp of job submission. */ + public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; + + /** Property to set custom writer of job statistics. */ + public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; + + /** Staging constant. */ + private static final String STAGING_CONSTANT = ".staging"; + + /** Old mapper class attribute. */ + private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; + + /** Old reducer class attribute. */ + private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } + + /** + * Wraps native split. + * + * @param id Split ID. + * @param split Split. + * @param hosts Hosts. + * @throws IOException If failed. + */ + public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { + ByteArrayOutputStream arr = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(arr); + + assert split instanceof Writable; + + ((Writable)split).write(out); + + out.flush(); + + return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); + } + + /** + * Unwraps native split. + * + * @param o Wrapper. + * @return Split. + */ + public static Object unwrapSplit(HadoopSplitWrapper o) { + try { + Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); + + w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); + + return w; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Convert Ignite job status to Hadoop job status. + * + * @param status Ignite job status. + * @return Hadoop job status. + */ + public static JobStatus status(HadoopJobStatus status, Configuration conf) { + JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); + + float setupProgress = 0; + float mapProgress = 0; + float reduceProgress = 0; + float cleanupProgress = 0; + + JobStatus.State state = JobStatus.State.RUNNING; + + switch (status.jobPhase()) { + case PHASE_SETUP: + setupProgress = 0.42f; + + break; + + case PHASE_MAP: + setupProgress = 1; + mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); + + break; + + case PHASE_REDUCE: + setupProgress = 1; + mapProgress = 1; + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; + + break; + + case PHASE_CANCELLING: + case PHASE_COMPLETE: + if (!status.isFailed()) { + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1; + cleanupProgress = 1; + + state = JobStatus.State.SUCCEEDED; + } + else + state = JobStatus.State.FAILED; + + break; + + default: + assert false; + } + + return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, + JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); + } + + /** + * Gets staging area directory. + * + * @param conf Configuration. + * @param usr User. + * @return Staging area directory. + */ + public static Path stagingAreaDir(Configuration conf, String usr) { + return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); + } + + /** + * Gets job file. + * + * @param conf Configuration. + * @param usr User. + * @param jobId Job ID. + * @return Job file. + */ + public static Path jobFile(Configuration conf, String usr, JobID jobId) { + return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); + } + + /** + * Checks the attribute in configuration is not set. + * + * @param attr Attribute name. + * @param msg Message for creation of exception. + * @throws IgniteCheckedException If attribute is set. + */ + public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { + if (cfg.get(attr) != null) + throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); + } + + /** + * Creates JobInfo from hadoop configuration. + * + * @param cfg Hadoop configuration. + * @return Job info. + * @throws IgniteCheckedException If failed. + */ + public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { + JobConf jobConf = new JobConf(cfg); + + boolean hasCombiner = jobConf.get("mapred.combiner.class") != null + || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; + + int numReduces = jobConf.getNumReduceTasks(); + + jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); + + if (jobConf.getUseNewMapper()) { + String mode = "new map API"; + + ensureNotSet(jobConf, "mapred.input.format.class", mode); + ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, "mapred.partitioner.class", mode); + else + ensureNotSet(jobConf, "mapred.output.format.class", mode); + } + else { + String mode = "map compatibility"; + + ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); + else + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + } + + if (numReduces != 0) { + jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); + + if (jobConf.getUseNewReducer()) { + String mode = "new reduce API"; + + ensureNotSet(jobConf, "mapred.output.format.class", mode); + ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); + } + else { + String mode = "reduce compatibility"; + + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); + } + } + + Map<String, String> props = new HashMap<>(); + + for (Map.Entry<String, String> entry : jobConf) + props.put(entry.getKey(), entry.getValue()); + + return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); + } + + /** + * Throws new {@link IgniteCheckedException} with original exception is serialized into string. + * This is needed to transfer error outside the current class loader. + * + * @param e Original exception. + * @return IgniteCheckedException New exception. + */ + public static IgniteCheckedException transformException(Throwable e) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + e.printStackTrace(new PrintStream(os, true)); + + return new IgniteCheckedException(os.toString()); + } + + /** + * Returns work directory for job execution. + * + * @param locNodeId Local node ID. + * @param jobId Job ID. + * @return Working directory for job. + * @throws IgniteCheckedException If Failed. + */ + public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException { + return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); + } + + /** + * Returns subdirectory of job working directory for task execution. + * + * @param locNodeId Local node ID. + * @param info Task info. + * @return Working directory for task. + * @throws IgniteCheckedException If Failed. + */ + public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException { + File jobLocDir = jobLocalDir(locNodeId, info.jobId()); + + return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); + } + + /** + * Creates {@link Configuration} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link Configuration}. + */ + public static Configuration safeCreateConfiguration() { + final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader()); + + try { + return new Configuration(); + } + finally { + restoreContextClassLoader(oldLdr); + } + } + + + + /** + * Set context class loader. + * + * @param newLdr New class loader. + * @return Old class loader. + */ + @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) { + ClassLoader oldLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(newLdr); + + return oldLdr; + } + + /** + * Restore context class loader. + * + * @param oldLdr Original class loader. + */ + public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) { + ClassLoader newLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(oldLdr); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java new file mode 100644 index 0000000..3f682d3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Default Hadoop counter implementation. + */ +public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Counter group name. */ + private String grp; + + /** Counter name. */ + private String name; + + /** + * Default constructor required by {@link Externalizable}. + */ + protected HadoopCounterAdapter() { + // No-op. + } + + /** + * Creates new counter with given group and name. + * + * @param grp Counter group name. + * @param name Counter name. + */ + protected HadoopCounterAdapter(String grp, String name) { + assert grp != null : "counter must have group"; + assert name != null : "counter must have name"; + + this.grp = grp; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override @Nullable public String group() { + return grp; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(grp); + out.writeUTF(name); + writeValue(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + grp = in.readUTF(); + name = in.readUTF(); + readValue(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCounterAdapter cntr = (HadoopCounterAdapter)o; + + if (!grp.equals(cntr.grp)) + return false; + if (!name.equals(cntr.name)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = grp.hashCode(); + res = 31 * res + name.hashCode(); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCounterAdapter.class, this); + } + + /** + * Writes value of this counter to output. + * + * @param out Output. + * @throws IOException If failed. + */ + protected abstract void writeValue(ObjectOutput out) throws IOException; + + /** + * Read value of this counter from input. + * + * @param in Input. + * @throws IOException If failed. + */ + protected abstract void readValue(ObjectInput in) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java new file mode 100644 index 0000000..f3b5463 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jsr166.ConcurrentHashMap8; + +/** + * Default in-memory counters store. + */ +public class HadoopCountersImpl implements HadoopCounters, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>(); + + /** + * Default constructor. Creates new instance without counters. + */ + public HadoopCountersImpl() { + // No-op. + } + + /** + * Creates new instance that contain given counters. + * + * @param cntrs Counters to store. + */ + public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) { + addCounters(cntrs, true); + } + + /** + * Copy constructor. + * + * @param cntrs Counters to copy. + */ + public HadoopCountersImpl(HadoopCounters cntrs) { + this(cntrs.all()); + } + + /** + * Creates counter instance. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + * @return Counter. + */ + private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp, + String name) { + try { + Constructor constructor = cls.getConstructor(String.class, String.class); + + return (T)constructor.newInstance(grp, name); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** + * Adds counters collection in addition to existing counters. + * + * @param cntrs Counters to add. + * @param cp Whether to copy counters or not. + */ + private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) { + assert cntrs != null; + + for (HadoopCounter cntr : cntrs) { + if (cp) { + HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); + + cntrCp.merge(cntr); + + cntr = cntrCp; + } + + cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); + } + } + + /** {@inheritDoc} */ + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { + assert cls != null; + + CounterKey mapKey = new CounterKey(cls, grp, name); + + T cntr = (T)cntrsMap.get(mapKey); + + if (cntr == null) { + cntr = createCounter(cls, grp, name); + + T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); + + if (old != null) + return old; + } + + return cntr; + } + + /** {@inheritDoc} */ + @Override public Collection<HadoopCounter> all() { + return cntrsMap.values(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounters other) { + for (HadoopCounter counter : other.all()) + counter(counter.group(), counter.name(), counter.getClass()).merge(counter); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeCollection(out, cntrsMap.values()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + addCounters(U.<HadoopCounter>readCollection(in), false); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCountersImpl counters = (HadoopCountersImpl)o; + + return cntrsMap.equals(counters.cntrsMap); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrsMap.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values()); + } + + /** + * The tuple of counter identifier components for more readable code. + */ + private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + */ + private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) { + super(cls, grp, name); + } + + /** + * Empty constructor required by {@link Externalizable}. + */ + public CounterKey() { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java new file mode 100644 index 0000000..0d61e0d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. + */ +public class HadoopLongCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The counter value. */ + private long val; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopLongCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopLongCounter(String grp, String name) { + super(grp, name); + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + out.writeLong(val); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + val = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + val += ((HadoopLongCounter)cntr).val; + } + + /** + * Gets current value of this counter. + * + * @return Current value. + */ + public long value() { + return val; + } + + /** + * Sets current value by the given value. + * + * @param val Value to set. + */ + public void value(long val) { + this.val = val; + } + + /** + * Increment this counter by the given value. + * + * @param i Value to increase this counter by. + */ + public void increment(long i) { + val += i; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java new file mode 100644 index 0000000..dedc6b3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY; + +/** + * Counter for the job statistics accumulation. + */ +public class HadoopPerformanceCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The group name for this counter. */ + private static final String GROUP_NAME = "SYSTEM"; + + /** The counter name for this counter. */ + private static final String COUNTER_NAME = "PERFORMANCE"; + + /** Events collections. */ + private Collection<T2<String,Long>> evts = new ArrayList<>(); + + /** Node id to insert into the event info. */ + private UUID nodeId; + + /** */ + private int reducerNum; + + /** */ + private volatile Long firstShuffleMsg; + + /** */ + private volatile Long lastShuffleMsg; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopPerformanceCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopPerformanceCounter(String grp, String name) { + super(grp, name); + } + + /** + * Constructor to create instance to use this as helper. + * + * @param nodeId Id of the work node. + */ + public HadoopPerformanceCounter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + U.writeCollection(out, evts); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + try { + evts = U.readCollection(in); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + evts.addAll(((HadoopPerformanceCounter)cntr).evts); + } + + /** + * Gets the events collection. + * + * @return Collection of event. + */ + public Collection<T2<String, Long>> evts() { + return evts; + } + + /** + * Generate name that consists of some event information. + * + * @param info Task info. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(HadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { + assert nodeId != null; + + return taskType + " " + taskNum + " " + evtType + " " + nodeId; + } + + /** + * Adds event of the task submission (task instance creation). + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskSubmit(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "submit"), ts)); + } + + /** + * Adds event of the task preparation. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskPrepare(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "prepare"), ts)); + } + + /** + * Adds event of the task finish. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskFinish(HadoopTaskInfo info, long ts) { + if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); + + lastShuffleMsg = null; + } + + evts.add(new T2<>(eventName(info, "finish"), ts)); + } + + /** + * Adds event of the task run. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskStart(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "start"), ts)); + } + + /** + * Adds event of the job preparation. + * + * @param ts Timestamp of the event. + */ + public void onJobPrepare(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB prepare " + nodeId, ts)); + } + + /** + * Adds event of the job start. + * + * @param ts Timestamp of the event. + */ + public void onJobStart(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB start " + nodeId, ts)); + } + + /** + * Adds client submission events from job info. + * + * @param info Job info. + */ + public void clientSubmissionEvents(HadoopJobInfo info) { + assert nodeId != null; + + addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); + } + + /** + * Adds event with timestamp from some property in job info. + * + * @param evt Event type and phase. + * @param info Job info. + * @param propName Property name to get timestamp. + */ + private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { + String val = info.property(propName); + + if (!F.isEmpty(val)) { + try { + evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); + } + catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); + } + } + } + + /** + * Registers shuffle message event. + * + * @param reducerNum Number of reducer that receives the data. + * @param ts Timestamp of the event. + */ + public void onShuffleMessage(int reducerNum, long ts) { + this.reducerNum = reducerNum; + + if (firstShuffleMsg == null) + firstShuffleMsg = ts; + + lastShuffleMsg = ts; + } + + /** + * Gets system predefined performance counter from the HadoopCounters object. + * + * @param cntrs HadoopCounters object. + * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. + * @return Predefined performance counter. + */ + public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { + HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + + if (nodeId != null) + cntr.nodeId(nodeId); + + return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + } + + /** + * Sets the nodeId field. + * + * @param nodeId Node id. + */ + private void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java new file mode 100644 index 0000000..1ecbee5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * File system cache utility methods used by Map-Reduce tasks and jobs. + */ +public class HadoopFileSystemCacheUtils { + /** + * A common static factory method. Creates new HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) throws IOException { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached in the given map upon first request. + * + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) + throws IOException { + assert map != null; + assert cfg != null; + + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..68c0dc4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsConstants; +import org.jetbrains.annotations.Nullable; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..681cddb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jsr166.ConcurrentHashMap8; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + + assert getClass().getClassLoader() == Ignite.class.getClassLoader(); + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + if (closed) + return; + + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IOException On failure. + */ + public V createValue(K key) throws IOException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..cbb007f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.File; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} \ No newline at end of file