http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java deleted file mode 100644 index 57a853f..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.FileSystemCounter; -import org.apache.hadoop.mapreduce.counters.AbstractCounters; -import org.apache.hadoop.mapreduce.counters.Limits; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * Hadoop counters adapter. - */ -public class HadoopMapReduceCounters extends Counters { - /** */ - private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>(); - - /** - * Creates new instance based on given counters. - * - * @param cntrs Counters to adapt. - */ - public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) { - for (HadoopCounter cntr : cntrs.all()) - if (cntr instanceof HadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup addGroup(CounterGroup grp) { - return addGroup(grp.getName(), grp.getDisplayName()); - } - - /** {@inheritDoc} */ - @Override public CounterGroup addGroup(String name, String displayName) { - return new HadoopMapReduceCounterGroup(this, name); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String grpName, String cntrName) { - return findCounter(grpName, cntrName, true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(Enum<?> key) { - return findCounter(key.getDeclaringClass().getName(), key.name(), true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { - return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); - } - - /** {@inheritDoc} */ - @Override public synchronized Iterable<String> getGroupNames() { - Collection<String> res = new HashSet<>(); - - for (HadoopCounter counter : cntrs.values()) - res.add(counter.group()); - - return res; - } - - /** {@inheritDoc} */ - @Override public Iterator<CounterGroup> iterator() { - final Iterator<String> iter = getGroupNames().iterator(); - - return new Iterator<CounterGroup>() { - @Override public boolean hasNext() { - return iter.hasNext(); - } - - @Override public CounterGroup next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next()); - } - - @Override public void remove() { - throw new UnsupportedOperationException("not implemented"); - } - }; - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup getGroup(String grpName) { - return new HadoopMapReduceCounterGroup(this, grpName); - } - - /** {@inheritDoc} */ - @Override public synchronized int countCounters() { - return cntrs.size(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { - for (CounterGroup group : other) { - for (Counter counter : group) { - findCounter(group.getName(), counter.getName()).increment(counter.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object genericRight) { - if (!(genericRight instanceof HadoopMapReduceCounters)) - return false; - - return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public void setWriteAllCounters(boolean snd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean getWriteAllCounters() { - return true; - } - - /** {@inheritDoc} */ - @Override public Limits limits() { - return null; - } - - /** - * Returns size of a group. - * - * @param grpName Name of the group. - * @return amount of counters in the given group. - */ - public int groupSize(String grpName) { - int res = 0; - - for (HadoopCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - res++; - } - - return res; - } - - /** - * Returns counters iterator for specified group. - * - * @param grpName Name of the group to iterate. - * @return Counters iterator. - */ - public Iterator<Counter> iterateGroup(String grpName) { - Collection<Counter> grpCounters = new ArrayList<>(); - - for (HadoopLongCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - grpCounters.add(new HadoopV2Counter(counter)); - } - - return grpCounters.iterator(); - } - - /** - * Find a counter in the group. - * - * @param grpName The name of the counter group. - * @param cntrName The name of the counter. - * @param create Create the counter if not found if true. - * @return The counter that was found or added or {@code null} if create is false. - */ - public Counter findCounter(String grpName, String cntrName, boolean create) { - T2<String, String> key = new T2<>(grpName, cntrName); - - HadoopLongCounter internalCntr = cntrs.get(key); - - if (internalCntr == null & create) { - internalCntr = new HadoopLongCounter(grpName,cntrName); - - cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); - } - - return internalCntr == null ? null : new HadoopV2Counter(internalCntr); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java deleted file mode 100644 index b9c20c3..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -import java.io.IOException; -import java.util.List; -import java.util.ListIterator; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Hadoop processor. - */ -public class HadoopProcessor extends HadoopProcessorAdapter { - /** Job ID counter. */ - private final AtomicInteger idCtr = new AtomicInteger(); - - /** Hadoop context. */ - @GridToStringExclude - private HadoopContext hctx; - - /** Hadoop facade for public API. */ - @GridToStringExclude - private Hadoop hadoop; - - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public HadoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.isDaemon()) - return; - - HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); - - if (cfg == null) - cfg = new HadoopConfiguration(); - else - cfg = new HadoopConfiguration(cfg); - - initializeDefaults(cfg); - - hctx = new HadoopContext( - ctx, - cfg, - new HadoopJobTracker(), - new HadoopEmbeddedTaskExecutor(), - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), - new HadoopShuffle()); - - for (HadoopComponent c : hctx.components()) - c.start(hctx); - - hadoop = new HadoopImpl(this); - - ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg)); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (hctx == null) - return; - - for (HadoopComponent c : hctx.components()) - c.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (hctx == null) - return; - - List<HadoopComponent> components = hctx.components(); - - for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.onKernalStop(cancel); - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (hctx == null) - return; - - List<HadoopComponent> components = hctx.components(); - - for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.stop(cancel); - } - } - - /** - * Gets Hadoop context. - * - * @return Hadoop context. - */ - public HadoopContext context() { - return hctx; - } - - /** {@inheritDoc} */ - @Override public Hadoop hadoop() { - if (hadoop == null) - throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + - "is HADOOP_HOME environment variable set?)"); - - return hadoop; - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration config() { - return hctx.configuration(); - } - - /** {@inheritDoc} */ - @Override public HadoopJobId nextJobId() { - return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { - return hctx.jobTracker().submit(jobId, jobInfo); - } - - /** {@inheritDoc} */ - @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().status(jobId); - } - - /** {@inheritDoc} */ - @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().jobCounters(jobId); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().finishFuture(jobId); - } - - /** {@inheritDoc} */ - @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().killJob(jobId); - } - - /** {@inheritDoc} */ - @Override public void validateEnvironment() throws IgniteCheckedException { - // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here. - try { - HadoopLocations loc = HadoopClasspathUtils.locations(); - - if (!F.isEmpty(loc.home())) - U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home()); - - U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " + - loc.mapred()); - } - catch (IOException ioe) { - throw new IgniteCheckedException(ioe.getMessage(), ioe); - } - - HadoopClassLoader.hadoopUrls(); - } - - /** - * Initializes default hadoop configuration. - * - * @param cfg Hadoop configuration. - */ - private void initializeDefaults(HadoopConfiguration cfg) { - if (cfg.getMapReducePlanner() == null) - cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessor.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java deleted file mode 100644 index ed39ce5..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java +++ /dev/null @@ -1,542 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.Scanner; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; -import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; - -/** - * Setup tool to configure Hadoop client. - */ -public class HadoopSetup { - /** */ - public static final String WINUTILS_EXE = "winutils.exe"; - - /** */ - private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("ignite-") && name.endsWith(".jar"); - } - }; - - /** - * The main method. - * @param ignore Params. - */ - public static void main(String[] ignore) { - X.println( - " __________ ________________ ", - " / _/ ___/ |/ / _/_ __/ __/ ", - " _/ // (7 7 // / / / / _/ ", - "/___/\\___/_/|_/___/ /_/ /___/ ", - " for Apache Hadoop ", - " ", - "ver. " + ACK_VER_STR, - COPYRIGHT); - - configureHadoop(); - } - - /** - * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. - * It performs these operations: - * <ul> - * <li>Check for setting of HADOOP_HOME environment variable.</li> - * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li> - * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li> - * <li>In Windows check new line character issues in CMD scripts.</li> - * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li> - * </ul> - */ - private static void configureHadoop() { - String igniteHome = U.getIgniteHome(); - - println("IGNITE_HOME is set to '" + igniteHome + "'."); - - checkIgniteHome(igniteHome); - - String homeVar = "HADOOP_HOME"; - String hadoopHome = System.getenv(homeVar); - - if (F.isEmpty(hadoopHome)) { - homeVar = "HADOOP_PREFIX"; - hadoopHome = System.getenv(homeVar); - } - - if (F.isEmpty(hadoopHome)) - exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + - "valid Hadoop installation directory and run setup tool again.", null); - - hadoopHome = hadoopHome.replaceAll("\"", ""); - - println(homeVar + " is set to '" + hadoopHome + "'."); - - String hiveHome = System.getenv("HIVE_HOME"); - - if (!F.isEmpty(hiveHome)) { - hiveHome = hiveHome.replaceAll("\"", ""); - - println("HIVE_HOME is set to '" + hiveHome + "'."); - } - - File hadoopDir = new File(hadoopHome); - - if (!hadoopDir.exists()) - exit("Hadoop installation folder does not exist.", null); - - if (!hadoopDir.isDirectory()) - exit("HADOOP_HOME must point to a directory.", null); - - if (!hadoopDir.canRead()) - exit("Hadoop installation folder can not be read. Please check permissions.", null); - - final File hadoopCommonDir; - - String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); - - if (F.isEmpty(hadoopCommonHome)) { - hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); - - println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); - } - else { - println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); - - hadoopCommonDir = new File(hadoopCommonHome); - } - - if (!hadoopCommonDir.canRead()) - exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null); - - final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); - - if (!hadoopCommonLibDir.canRead()) - exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); - - if (U.isWindows()) { - checkJavaPathSpaces(); - - final File hadoopBinDir = new File(hadoopDir, "bin"); - - if (!hadoopBinDir.canRead()) - exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); - - File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); - - if (!winutilsFile.exists()) { - if (ask("File '" + WINUTILS_EXE + "' does not exist. " + - "It may be replaced by a stub. Create it?")) { - println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); - - boolean ok = false; - - try { - ok = winutilsFile.createNewFile(); - } - catch (IOException ignore) { - // No-op. - } - - if (!ok) - exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); - } - else - println("Ok. But Hadoop client probably will not work on Windows this way..."); - } - - processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); - } - - File igniteLibs = new File(new File(igniteHome), "libs"); - - if (!igniteLibs.exists()) - exit("Ignite 'libs' folder is not found.", null); - - Collection<File> jarFiles = new ArrayList<>(); - - addJarsInFolder(jarFiles, igniteLibs); - addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); - addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl")); - - boolean jarsLinksCorrect = true; - - for (File file : jarFiles) { - File link = new File(hadoopCommonLibDir, file.getName()); - - jarsLinksCorrect &= isJarLinkCorrect(link, file); - - if (!jarsLinksCorrect) - break; - } - - if (!jarsLinksCorrect) { - if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + - "Create appropriate symbolic links?")) { - File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); - - if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + - "installation. They must be deleted to continue. Continue?")) { - for (File file : oldIgniteJarFiles) { - println("Deleting file '" + file.getAbsolutePath() + "'."); - - if (!file.delete()) - exit("Failed to delete file '" + file.getPath() + "'.", null); - } - } - - for (File file : jarFiles) { - File targetFile = new File(hadoopCommonLibDir, file.getName()); - - try { - println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); - - Files.createSymbolicLink(targetFile.toPath(), file.toPath()); - } - catch (IOException e) { - if (U.isWindows()) { - warn("Ability to create symbolic links is required!"); - warn("On Windows platform you have to grant permission 'Create symbolic links'"); - warn("to your user or run the Accelerator as Administrator."); - } - - exit("Creating symbolic link failed! Check permissions.", e); - } - } - } - else - println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); - } - - File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); - - File igniteHadoopCfg = igniteHadoopConfig(igniteHome); - - if (!igniteHadoopCfg.canRead()) - exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null); - - if (hadoopEtc.canWrite()) { // TODO Bigtop - if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + - "(existing files will be backed up)?")) { - replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"), - new File(hadoopEtc, "core-site.xml")); - - replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"), - new File(hadoopEtc, "mapred-site.xml")); - } - else - println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); - } - - if (!F.isEmpty(hiveHome)) { - File hiveConfDir = new File(hiveHome + File.separator + "conf"); - - if (!hiveConfDir.canWrite()) - warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + - "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); - else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) - replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"), - new File(hiveConfDir, "hive-site.xml")); - else - println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); - } - - println("Apache Hadoop setup is complete."); - } - - /** - * Get Ignite Hadoop config directory. - * - * @param igniteHome Ignite home. - * @return Ignite Hadoop config directory. - */ - private static File igniteHadoopConfig(String igniteHome) { - Path path = Paths.get(igniteHome, "modules", "hadoop", "config"); - - if (!Files.exists(path)) - path = Paths.get(igniteHome, "config", "hadoop"); - - if (Files.exists(path)) - return path.toFile(); - else - return new File(igniteHome, "docs"); - } - - /** - * @param jarFiles Jars. - * @param folder Folder. - */ - private static void addJarsInFolder(Collection<File> jarFiles, File folder) { - if (!folder.exists()) - exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); - - jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); - } - - /** - * Checks that JAVA_HOME does not contain space characters. - */ - private static void checkJavaPathSpaces() { - String javaHome = System.getProperty("java.home"); - - if (javaHome.contains(" ")) { - warn("Java installation path contains space characters!"); - warn("Hadoop client will not be able to start using '" + javaHome + "'."); - warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); - } - } - - /** - * Checks Ignite home. - * - * @param igniteHome Ignite home. - */ - private static void checkIgniteHome(String igniteHome) { - URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); - - try { - Path jar = Paths.get(jarUrl.toURI()); - Path igHome = Paths.get(igniteHome); - - if (!jar.startsWith(igHome)) - exit("Ignite JAR files are not under IGNITE_HOME.", null); - } - catch (Exception e) { - exit(e.getMessage(), e); - } - } - - /** - * Replaces target file with source file. - * - * @param from From. - * @param to To. - */ - private static void replaceWithBackup(File from, File to) { - if (!from.canRead()) - exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); - - println("Replacing file '" + to.getAbsolutePath() + "'."); - - try { - U.copy(from, renameToBak(to), true); - } - catch (IOException e) { - exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); - } - } - - /** - * Renames file for backup. - * - * @param file File. - * @return File. - */ - private static File renameToBak(File file) { - DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); - - if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) - exit("Failed to rename file '" + file.getPath() + "'.", null); - - return file; - } - - /** - * Checks if link is correct. - * - * @param link Symbolic link. - * @param correctTarget Correct link target. - * @return {@code true} If link target is correct. - */ - private static boolean isJarLinkCorrect(File link, File correctTarget) { - if (!Files.isSymbolicLink(link.toPath())) - return false; // It is a real file or it does not exist. - - Path target = null; - - try { - target = Files.readSymbolicLink(link.toPath()); - } - catch (IOException e) { - exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); - } - - return Files.exists(target) && target.toFile().equals(correctTarget); - } - - /** - * Writes the question end read the boolean answer from the console. - * - * @param question Question to write. - * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. - */ - private static boolean ask(String question) { - X.println(); - X.print(" < " + question + " (Y/N): "); - - String answer = null; - - if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) - answer = "Y"; - else { - BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); - - try { - answer = br.readLine(); - } - catch (IOException e) { - exit("Failed to read answer: " + e.getMessage(), e); - } - } - - if (answer != null && "Y".equals(answer.toUpperCase().trim())) { - X.println(" > Yes."); - - return true; - } - else { - X.println(" > No."); - - return false; - } - } - - /** - * Exit with message. - * - * @param msg Exit message. - */ - private static void exit(String msg, Exception e) { - X.println(" "); - X.println(" # " + msg); - X.println(" # Setup failed, exiting... "); - - if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) - e.printStackTrace(); - - System.exit(1); - } - - /** - * Prints message. - * - * @param msg Message. - */ - private static void println(String msg) { - X.println(" > " + msg); - } - - /** - * Prints warning. - * - * @param msg Message. - */ - private static void warn(String msg) { - X.println(" ! " + msg); - } - - /** - * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the - * answer. If it's 'Y' then backups original files and corrects invalid new line characters. - * - * @param rootDir Root directory to process. - * @param dirs Directories inside of the root to process. - */ - private static void processCmdFiles(File rootDir, String... dirs) { - boolean answer = false; - - for (String dir : dirs) { - File subDir = new File(rootDir, dir); - - File[] cmdFiles = subDir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(".cmd"); - } - }); - - for (File file : cmdFiles) { - String content = null; - - try (Scanner scanner = new Scanner(file)) { - content = scanner.useDelimiter("\\Z").next(); - } - catch (FileNotFoundException e) { - exit("Failed to read file '" + file + "'.", e); - } - - boolean invalid = false; - - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { - invalid = true; - - break; - } - } - - if (invalid) { - answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); - - if (!answer) { - println("Ok. But Windows most probably will fail to execute them..."); - - return; - } - - println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); - - renameToBak(file); - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) - writer.write("\r"); - - writer.write(content.charAt(i)); - } - } - catch (IOException e) { - exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); - } - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java deleted file mode 100644 index 1dc8674..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteException; - -/** - * Exception that throws when the task is cancelling. - */ -public class HadoopTaskCancelledException extends IgniteException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Exception message. - */ - public HadoopTaskCancelledException(String msg) { - super(msg); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java deleted file mode 100644 index 08c3cb5..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.io.PrintStream; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobPriority; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop utility methods. - */ -public class HadoopUtils { - /** Property to store timestamp of new job id request. */ - public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; - - /** Property to store timestamp of response of new job id request. */ - public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; - - /** Property to store timestamp of job submission. */ - public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; - - /** Property to set custom writer of job statistics. */ - public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; - - /** Staging constant. */ - private static final String STAGING_CONSTANT = ".staging"; - - /** Old mapper class attribute. */ - private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; - - /** Old reducer class attribute. */ - private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; - - /** - * Constructor. - */ - private HadoopUtils() { - // No-op. - } - - /** - * Wraps native split. - * - * @param id Split ID. - * @param split Split. - * @param hosts Hosts. - * @throws IOException If failed. - */ - public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { - ByteArrayOutputStream arr = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(arr); - - assert split instanceof Writable; - - ((Writable)split).write(out); - - out.flush(); - - return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); - } - - /** - * Unwraps native split. - * - * @param o Wrapper. - * @return Split. - */ - public static Object unwrapSplit(HadoopSplitWrapper o) { - try { - Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); - - w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); - - return w; - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - /** - * Convert Ignite job status to Hadoop job status. - * - * @param status Ignite job status. - * @return Hadoop job status. - */ - public static JobStatus status(HadoopJobStatus status, Configuration conf) { - JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); - - float setupProgress = 0; - float mapProgress = 0; - float reduceProgress = 0; - float cleanupProgress = 0; - - JobStatus.State state = JobStatus.State.RUNNING; - - switch (status.jobPhase()) { - case PHASE_SETUP: - setupProgress = 0.42f; - - break; - - case PHASE_MAP: - setupProgress = 1; - mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); - - break; - - case PHASE_REDUCE: - setupProgress = 1; - mapProgress = 1; - - if (status.totalReducerCnt() > 0) - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); - else - reduceProgress = 1f; - - break; - - case PHASE_CANCELLING: - case PHASE_COMPLETE: - if (!status.isFailed()) { - setupProgress = 1; - mapProgress = 1; - reduceProgress = 1; - cleanupProgress = 1; - - state = JobStatus.State.SUCCEEDED; - } - else - state = JobStatus.State.FAILED; - - break; - - default: - assert false; - } - - return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, - JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); - } - - /** - * Gets staging area directory. - * - * @param conf Configuration. - * @param usr User. - * @return Staging area directory. - */ - public static Path stagingAreaDir(Configuration conf, String usr) { - return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) - + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); - } - - /** - * Gets job file. - * - * @param conf Configuration. - * @param usr User. - * @param jobId Job ID. - * @return Job file. - */ - public static Path jobFile(Configuration conf, String usr, JobID jobId) { - return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); - } - - /** - * Checks the attribute in configuration is not set. - * - * @param attr Attribute name. - * @param msg Message for creation of exception. - * @throws IgniteCheckedException If attribute is set. - */ - public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { - if (cfg.get(attr) != null) - throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); - } - - /** - * Creates JobInfo from hadoop configuration. - * - * @param cfg Hadoop configuration. - * @return Job info. - * @throws IgniteCheckedException If failed. - */ - public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { - JobConf jobConf = new JobConf(cfg); - - boolean hasCombiner = jobConf.get("mapred.combiner.class") != null - || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; - - int numReduces = jobConf.getNumReduceTasks(); - - jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); - - if (jobConf.getUseNewMapper()) { - String mode = "new map API"; - - ensureNotSet(jobConf, "mapred.input.format.class", mode); - ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); - - if (numReduces != 0) - ensureNotSet(jobConf, "mapred.partitioner.class", mode); - else - ensureNotSet(jobConf, "mapred.output.format.class", mode); - } - else { - String mode = "map compatibility"; - - ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); - ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); - - if (numReduces != 0) - ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); - else - ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); - } - - if (numReduces != 0) { - jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); - - if (jobConf.getUseNewReducer()) { - String mode = "new reduce API"; - - ensureNotSet(jobConf, "mapred.output.format.class", mode); - ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); - } - else { - String mode = "reduce compatibility"; - - ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); - ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); - } - } - - Map<String, String> props = new HashMap<>(); - - for (Map.Entry<String, String> entry : jobConf) - props.put(entry.getKey(), entry.getValue()); - - return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); - } - - /** - * Throws new {@link IgniteCheckedException} with original exception is serialized into string. - * This is needed to transfer error outside the current class loader. - * - * @param e Original exception. - * @return IgniteCheckedException New exception. - */ - public static IgniteCheckedException transformException(Throwable e) { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - - e.printStackTrace(new PrintStream(os, true)); - - return new IgniteCheckedException(os.toString()); - } - - /** - * Returns work directory for job execution. - * - * @param locNodeId Local node ID. - * @param jobId Job ID. - * @return Working directory for job. - * @throws IgniteCheckedException If Failed. - */ - public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException { - return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); - } - - /** - * Returns subdirectory of job working directory for task execution. - * - * @param locNodeId Local node ID. - * @param info Task info. - * @return Working directory for task. - * @throws IgniteCheckedException If Failed. - */ - public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException { - File jobLocDir = jobLocalDir(locNodeId, info.jobId()); - - return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); - } - - /** - * Creates {@link Configuration} in a correct class loader context to avoid caching - * of inappropriate class loader in the Configuration object. - * @return New instance of {@link Configuration}. - */ - public static Configuration safeCreateConfiguration() { - final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader()); - - try { - return new Configuration(); - } - finally { - restoreContextClassLoader(oldLdr); - } - } - - - - /** - * Set context class loader. - * - * @param newLdr New class loader. - * @return Old class loader. - */ - @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) { - ClassLoader oldLdr = Thread.currentThread().getContextClassLoader(); - - if (newLdr != oldLdr) - Thread.currentThread().setContextClassLoader(newLdr); - - return oldLdr; - } - - /** - * Restore context class loader. - * - * @param oldLdr Original class loader. - */ - public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) { - ClassLoader newLdr = Thread.currentThread().getContextClassLoader(); - - if (newLdr != oldLdr) - Thread.currentThread().setContextClassLoader(oldLdr); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java deleted file mode 100644 index 3f682d3..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * Default Hadoop counter implementation. - */ -public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Counter group name. */ - private String grp; - - /** Counter name. */ - private String name; - - /** - * Default constructor required by {@link Externalizable}. - */ - protected HadoopCounterAdapter() { - // No-op. - } - - /** - * Creates new counter with given group and name. - * - * @param grp Counter group name. - * @param name Counter name. - */ - protected HadoopCounterAdapter(String grp, String name) { - assert grp != null : "counter must have group"; - assert name != null : "counter must have name"; - - this.grp = grp; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override @Nullable public String group() { - return grp; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(grp); - out.writeUTF(name); - writeValue(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - grp = in.readUTF(); - name = in.readUTF(); - readValue(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - HadoopCounterAdapter cntr = (HadoopCounterAdapter)o; - - if (!grp.equals(cntr.grp)) - return false; - if (!name.equals(cntr.name)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = grp.hashCode(); - res = 31 * res + name.hashCode(); - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopCounterAdapter.class, this); - } - - /** - * Writes value of this counter to output. - * - * @param out Output. - * @throws IOException If failed. - */ - protected abstract void writeValue(ObjectOutput out) throws IOException; - - /** - * Read value of this counter from input. - * - * @param in Input. - * @throws IOException If failed. - */ - protected abstract void readValue(ObjectInput in) throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java deleted file mode 100644 index f3b5463..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.Constructor; -import java.util.Collection; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.lang.GridTuple3; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jsr166.ConcurrentHashMap8; - -/** - * Default in-memory counters store. - */ -public class HadoopCountersImpl implements HadoopCounters, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>(); - - /** - * Default constructor. Creates new instance without counters. - */ - public HadoopCountersImpl() { - // No-op. - } - - /** - * Creates new instance that contain given counters. - * - * @param cntrs Counters to store. - */ - public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) { - addCounters(cntrs, true); - } - - /** - * Copy constructor. - * - * @param cntrs Counters to copy. - */ - public HadoopCountersImpl(HadoopCounters cntrs) { - this(cntrs.all()); - } - - /** - * Creates counter instance. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - * @return Counter. - */ - private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp, - String name) { - try { - Constructor constructor = cls.getConstructor(String.class, String.class); - - return (T)constructor.newInstance(grp, name); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** - * Adds counters collection in addition to existing counters. - * - * @param cntrs Counters to add. - * @param cp Whether to copy counters or not. - */ - private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) { - assert cntrs != null; - - for (HadoopCounter cntr : cntrs) { - if (cp) { - HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); - - cntrCp.merge(cntr); - - cntr = cntrCp; - } - - cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); - } - } - - /** {@inheritDoc} */ - @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { - assert cls != null; - - CounterKey mapKey = new CounterKey(cls, grp, name); - - T cntr = (T)cntrsMap.get(mapKey); - - if (cntr == null) { - cntr = createCounter(cls, grp, name); - - T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); - - if (old != null) - return old; - } - - return cntr; - } - - /** {@inheritDoc} */ - @Override public Collection<HadoopCounter> all() { - return cntrsMap.values(); - } - - /** {@inheritDoc} */ - @Override public void merge(HadoopCounters other) { - for (HadoopCounter counter : other.all()) - counter(counter.group(), counter.name(), counter.getClass()).merge(counter); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeCollection(out, cntrsMap.values()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - addCounters(U.<HadoopCounter>readCollection(in), false); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - HadoopCountersImpl counters = (HadoopCountersImpl)o; - - return cntrsMap.equals(counters.cntrsMap); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrsMap.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values()); - } - - /** - * The tuple of counter identifier components for more readable code. - */ - private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Constructor. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - */ - private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) { - super(cls, grp, name); - } - - /** - * Empty constructor required by {@link Externalizable}. - */ - public CounterKey() { - // No-op. - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java deleted file mode 100644 index 0d61e0d..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. - */ -public class HadoopLongCounter extends HadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The counter value. */ - private long val; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopLongCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public HadoopLongCounter(String grp, String name) { - super(grp, name); - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - out.writeLong(val); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - val = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public void merge(HadoopCounter cntr) { - val += ((HadoopLongCounter)cntr).val; - } - - /** - * Gets current value of this counter. - * - * @return Current value. - */ - public long value() { - return val; - } - - /** - * Sets current value by the given value. - * - * @param val Value to set. - */ - public void value(long val) { - this.val = val; - } - - /** - * Increment this counter by the given value. - * - * @param i Value to increase this counter by. - */ - public void increment(long i) { - val += i; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java deleted file mode 100644 index dedc6b3..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY; - -/** - * Counter for the job statistics accumulation. - */ -public class HadoopPerformanceCounter extends HadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The group name for this counter. */ - private static final String GROUP_NAME = "SYSTEM"; - - /** The counter name for this counter. */ - private static final String COUNTER_NAME = "PERFORMANCE"; - - /** Events collections. */ - private Collection<T2<String,Long>> evts = new ArrayList<>(); - - /** Node id to insert into the event info. */ - private UUID nodeId; - - /** */ - private int reducerNum; - - /** */ - private volatile Long firstShuffleMsg; - - /** */ - private volatile Long lastShuffleMsg; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopPerformanceCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public HadoopPerformanceCounter(String grp, String name) { - super(grp, name); - } - - /** - * Constructor to create instance to use this as helper. - * - * @param nodeId Id of the work node. - */ - public HadoopPerformanceCounter(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - U.writeCollection(out, evts); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - try { - evts = U.readCollection(in); - } - catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public void merge(HadoopCounter cntr) { - evts.addAll(((HadoopPerformanceCounter)cntr).evts); - } - - /** - * Gets the events collection. - * - * @return Collection of event. - */ - public Collection<T2<String, Long>> evts() { - return evts; - } - - /** - * Generate name that consists of some event information. - * - * @param info Task info. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(HadoopTaskInfo info, String evtType) { - return eventName(info.type().toString(), info.taskNumber(), evtType); - } - - /** - * Generate name that consists of some event information. - * - * @param taskType Task type. - * @param taskNum Number of the task. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(String taskType, int taskNum, String evtType) { - assert nodeId != null; - - return taskType + " " + taskNum + " " + evtType + " " + nodeId; - } - - /** - * Adds event of the task submission (task instance creation). - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskSubmit(HadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "submit"), ts)); - } - - /** - * Adds event of the task preparation. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskPrepare(HadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "prepare"), ts)); - } - - /** - * Adds event of the task finish. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskFinish(HadoopTaskInfo info, long ts) { - if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); - - lastShuffleMsg = null; - } - - evts.add(new T2<>(eventName(info, "finish"), ts)); - } - - /** - * Adds event of the task run. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskStart(HadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "start"), ts)); - } - - /** - * Adds event of the job preparation. - * - * @param ts Timestamp of the event. - */ - public void onJobPrepare(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB prepare " + nodeId, ts)); - } - - /** - * Adds event of the job start. - * - * @param ts Timestamp of the event. - */ - public void onJobStart(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB start " + nodeId, ts)); - } - - /** - * Adds client submission events from job info. - * - * @param info Job info. - */ - public void clientSubmissionEvents(HadoopJobInfo info) { - assert nodeId != null; - - addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); - } - - /** - * Adds event with timestamp from some property in job info. - * - * @param evt Event type and phase. - * @param info Job info. - * @param propName Property name to get timestamp. - */ - private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { - String val = info.property(propName); - - if (!F.isEmpty(val)) { - try { - evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); - } - catch (NumberFormatException e) { - throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); - } - } - } - - /** - * Registers shuffle message event. - * - * @param reducerNum Number of reducer that receives the data. - * @param ts Timestamp of the event. - */ - public void onShuffleMessage(int reducerNum, long ts) { - this.reducerNum = reducerNum; - - if (firstShuffleMsg == null) - firstShuffleMsg = ts; - - lastShuffleMsg = ts; - } - - /** - * Gets system predefined performance counter from the HadoopCounters object. - * - * @param cntrs HadoopCounters object. - * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. - * @return Predefined performance counter. - */ - public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { - HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); - - if (nodeId != null) - cntr.nodeId(nodeId); - - return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); - } - - /** - * Sets the nodeId field. - * - * @param nodeId Node id. - */ - private void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java deleted file mode 100644 index 82769be..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.hadoop.util.UserNameMapper; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopFileSystemFactoryDelegate; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; - -/** - * Basic Hadoop file system factory delegate. - */ -public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { - /** Proxy. */ - protected final HadoopFileSystemFactory proxy; - - /** Configuration of the secondary filesystem, never null. */ - protected Configuration cfg; - - /** Resulting URI. */ - protected URI fullUri; - - /** User name mapper. */ - private UserNameMapper usrNameMapper; - - /** - * Constructor. - * - * @param proxy Proxy. - */ - public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) { - this.proxy = proxy; - } - - /** {@inheritDoc} */ - @Override public FileSystem get(String name) throws IOException { - String name0 = IgfsUtils.fixUserName(name); - - if (usrNameMapper != null) - name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); - - return getWithMappedName(name0); - } - - /** - * Internal file system create routine. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - */ - protected FileSystem getWithMappedName(String usrName) throws IOException { - assert cfg != null; - - try { - // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. - // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context - // classloader to classloader of current class to avoid strange class-cast-exceptions. - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - - try { - return create(usrName); - } - finally { - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - } - - /** - * Internal file system creation routine, invoked in correct class loader context. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - * @throws InterruptedException if the current thread is interrupted. - */ - protected FileSystem create(String usrName) throws IOException, InterruptedException { - return FileSystem.get(fullUri, cfg, usrName); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy; - - cfg = HadoopUtils.safeCreateConfiguration(); - - if (proxy0.getConfigPaths() != null) { - for (String cfgPath : proxy0.getConfigPaths()) { - if (cfgPath == null) - throw new NullPointerException("Configuration path cannot be null: " + - Arrays.toString(proxy0.getConfigPaths())); - else { - URL url = U.resolveIgniteUrl(cfgPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IgniteException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + cfgPath); - } - - cfg.addResource(url); - } - } - } - - // If secondary fs URI is not given explicitly, try to get it from the configuration: - if (proxy0.getUri() == null) - fullUri = FileSystem.getDefaultUri(cfg); - else { - try { - fullUri = new URI(proxy0.getUri()); - } - catch (URISyntaxException use) { - throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri()); - } - } - - usrNameMapper = proxy0.getUserNameMapper(); - - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).stop(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java deleted file mode 100644 index 04bbeb8..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; - -import java.io.IOException; - -/** - * Caching Hadoop file system factory delegate. - */ -public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { - /** Per-user file system cache. */ - private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { - @Override public FileSystem createValue(String key) throws IOException { - return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key); - } - } - ); - - /** - * Constructor. - * - * @param proxy Proxy. - */ - public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) { - super(proxy); - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - return cache.getOrCreate(name); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - super.start(); - - // Disable caching. - cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - super.stop(); - - try { - cache.close(); - } - catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } -}