http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java deleted file mode 100644 index bd8ed2d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ /dev/null @@ -1,1076 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs.v2; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.InvalidPathException; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Progressable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; -import org.apache.ignite.internal.processors.igfs.IgfsPaths; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; -import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; -import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser; -import static org.apache.ignite.igfs.IgfsMode.PROXY; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME; - -/** - * {@code IGFS} Hadoop 2.x file system driver over file system API. To use - * {@code IGFS} as Hadoop file system, you should configure this class - * in Hadoop's {@code core-site.xml} as follows: - * <pre name="code" class="xml"> - * <property> - * <name>fs.default.name</name> - * <value>igfs://ipc</value> - * </property> - * - * <property> - * <name>fs.igfs.impl</name> - * <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> - * </property> - * </pre> - * You should also add Ignite JAR and all libraries to Hadoop classpath. To - * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop - * distribution: - * <pre name="code" class="bash"> - * export IGNITE_HOME=/path/to/Ignite/distribution - * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar - * - * for f in $IGNITE_HOME/libs/*.jar; do - * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f; - * done - * </pre> - * <h1 class="header">Data vs Clients Nodes</h1> - * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on - * data nodes. Client nodes are responsible for basic file system operations as well as - * accessing data nodes remotely. Usually, client nodes are started together - * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually - * started together with Hadoop {@code task-tracker} processes. - * <p> - * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} - * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. - */ -public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable { - /** Logger. */ - private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class); - - /** Ensures that close routine is invoked at most once. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Grid remote client. */ - private HadoopIgfsWrapper rmtClient; - - /** The name of the user this File System created on behalf of. */ - private final String user; - - /** Working directory. */ - private IgfsPath workingDir; - - /** URI. */ - private final URI uri; - - /** Authority. */ - private String uriAuthority; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Server block size. */ - private long grpBlockSize; - - /** Default replication factor. */ - private short dfltReplication; - - /** Secondary URI string. */ - private URI secondaryUri; - - /** Mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** The secondary file system factory. */ - private HadoopFileSystemFactory factory; - - /** Whether custom sequential reads before prefetch value is provided. */ - private boolean seqReadsBeforePrefetchOverride; - - /** Custom-provided sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** Flag that controls whether file writes should be colocated on data node. */ - private boolean colocateFileWrites; - - /** Prefer local writes. */ - private boolean preferLocFileWrites; - - /** - * @param name URI for file system. - * @param cfg Configuration. - * @throws URISyntaxException if name has invalid syntax. - * @throws IOException If initialization failed. - */ - public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { - super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1); - - uri = name; - - user = getFsHadoopUser(); - - try { - initialize(name, cfg); - } - catch (IOException e) { - // Close client if exception occurred. - if (rmtClient != null) - rmtClient.close(false); - - throw e; - } - - workingDir = new IgfsPath("/user/" + user); - } - - /** {@inheritDoc} */ - @Override public void checkPath(Path path) { - URI uri = path.toUri(); - - if (uri.isAbsolute()) { - if (!F.eq(uri.getScheme(), IGFS_SCHEME)) - throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + - uri.getAuthority() + ']'); - - if (!F.eq(uri.getAuthority(), uriAuthority)) - throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + - uri.getAuthority() + ']'); - } - } - - /** - * Public setter that can be used by direct users of FS or Visor. - * - * @param colocateFileWrites Whether all ongoing file writes should be colocated. - */ - @SuppressWarnings("UnusedDeclaration") - public void colocateFileWrites(boolean colocateFileWrites) { - this.colocateFileWrites = colocateFileWrites; - } - - /** - * Enter busy state. - * - * @throws IOException If file system is stopped. - */ - private void enterBusy() throws IOException { - if (closeGuard.get()) - throw new IOException("File system is stopped."); - } - - /** - * Leave busy state. - */ - private void leaveBusy() { - // No-op. - } - - /** - * @param name URI passed to constructor. - * @param cfg Configuration passed to constructor. - * @throws IOException If initialization failed. - */ - @SuppressWarnings("ConstantConditions") - private void initialize(URI name, Configuration cfg) throws IOException { - enterBusy(); - - try { - if (rmtClient != null) - throw new IOException("File system is already initialized: " + rmtClient); - - A.notNull(name, "name"); - A.notNull(cfg, "cfg"); - - if (!IGFS_SCHEME.equals(name.getScheme())) - throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + - "://[name]/[optional_path], actual=" + name + ']'); - - uriAuthority = name.getAuthority(); - - // Override sequential reads before prefetch if needed. - seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); - - if (seqReadsBeforePrefetch > 0) - seqReadsBeforePrefetchOverride = true; - - // In Ignite replication factor is controlled by data cache affinity. - // We use replication factor to force the whole file to be stored on local node. - dfltReplication = (short)cfg.getInt("dfs.replication", 3); - - // Get file colocation control flag. - colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); - preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); - - // Get log directory. - String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); - - File logDirFile = U.resolveIgnitePath(logDirCfg); - - String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); - - // Handshake. - IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); - - grpBlockSize = handshake.blockSize(); - - IgfsPaths paths = handshake.secondaryPaths(); - - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); - - if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { - // Initiate client logger. - if (logDir == null) - throw new IOException("Failed to resolve log directory: " + logDirCfg); - - Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); - - clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); - } - else - clientLog = IgfsLogger.disabledLogger(); - - try { - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - } - catch (IgniteCheckedException ice) { - throw new IOException(ice); - } - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (!initSecondary && paths.pathModes() != null) { - for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - if (mode == PROXY) { - initSecondary = true; - - break; - } - } - } - - if (initSecondary) { - try { - factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to get secondary file system factory.", e); - } - - if (factory == null) - throw new IOException("Failed to get secondary file system factory (did you set " + - IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + - FileSystemConfiguration.class.getName() + "?)"); - - assert factory != null; - - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).start(); - - try { - FileSystem secFs = factory.get(user); - - secondaryUri = secFs.getUri(); - - A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); - } - catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); - } - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (rmtClient == null) - return; - - rmtClient.close(false); - - if (clientLog.isLogEnabled()) - clientLog.close(); - - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).stop(); - - // Reset initialized resources. - rmtClient = null; - } - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return uri; - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024, - false, 0, DataChecksum.Type.NULL); - } - - /** {@inheritDoc} */ - @Override public boolean setReplication(Path f, short replication) throws IOException { - return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication); - } - - /** {@inheritDoc} */ - @Override public void setTimes(Path f, long mtime, long atime) throws IOException { - if (mode(f) == PROXY) - secondaryFileSystem().setTimes(f, mtime, atime); - else { - if (mtime == -1 && atime == -1) - return; - - rmtClient.setTimes(convert(f), atime, mtime); - } - } - - /** {@inheritDoc} */ - @Override public FsStatus getFsStatus() throws IOException { - IgfsStatus status = rmtClient.fsStatus(); - - return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed()); - } - - /** {@inheritDoc} */ - @Override public void setPermission(Path p, FsPermission perm) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) - secondaryFileSystem().setPermission(toSecondary(p), perm); - else { - if (rmtClient.update(convert(p), permission(perm)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setOwner(Path p, String usr, String grp) throws IOException { - A.notNull(p, "p"); - A.notNull(usr, "username"); - A.notNull(grp, "grpName"); - - enterBusy(); - - try { - if (mode(p) == PROXY) - secondaryFileSystem().setOwner(toSecondary(p), usr, grp); - else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr, - IgfsUtils.PROP_GROUP_NAME, grp)) == null) { - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f)); - - long size = status != null ? status.getLen() : -1; - - long logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, PROXY, bufSize, size); - - return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); - } - else - return is; - } - else { - HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); - - HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); - - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - - return new FSDataInputStream(igfsIn); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream createInternal( - Path f, - EnumSet<CreateFlag> flag, - FsPermission perm, - int bufSize, - short replication, - long blockSize, - Progressable progress, - Options.ChecksumOpt checksumOpt, - boolean createParent - ) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - boolean overwrite = flag.contains(CreateFlag.OVERWRITE); - boolean append = flag.contains(CreateFlag.APPEND); - boolean create = flag.contains(CreateFlag.CREATE); - - OutputStream out = null; - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + - path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize, - replication, blockSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - if (append) - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - else - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm), - IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - - // Create stream and close it in the 'finally' section if any sequential operation failed. - HadoopIgfsStreamDelegate stream; - - long logId = -1; - - if (append) { - stream = rmtClient.append(path, create, permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, mode, bufSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - } - else { - stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, - permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - } - - assert stream != null; - - HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, - clientLog, logId); - - bufSize = Math.max(64 * 1024, bufSize); - - out = new BufferedOutputStream(igfsOut, bufSize); - - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - - // Mark stream created successfully. - out = null; - - return res; - } - } - finally { - // Close if failed during stream creation. - if (out != null) - U.closeQuiet(out); - - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return false; - } - - /** {@inheritDoc} */ - @Override public void renameInternal(Path src, Path dst) throws IOException { - A.notNull(src, "src"); - A.notNull(dst, "dst"); - - enterBusy(); - - try { - IgfsPath srcPath = convert(src); - IgfsPath dstPath = convert(dst); - - IgfsMode srcMode = modeRslvr.resolveMode(srcPath); - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, srcMode, dstPath); - - if (srcMode == PROXY) - secondaryFileSystem().rename(toSecondary(src), toSecondary(dst)); - else - rmtClient.rename(srcPath, dstPath); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFileSystem().delete(toSecondary(f), recursive); - } - - boolean res = rmtClient.delete(path, recursive); - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); - - return res; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { - // Checksum has effect for secondary FS only. - if (factory != null) - secondaryFileSystem().setVerifyChecksum(verifyChecksum); - } - - /** {@inheritDoc} */ - @Override public FileChecksum getFileChecksum(Path f) throws IOException { - if (mode(f) == PROXY) - return secondaryFileSystem().getFileChecksum(f); - - return null; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f)); - - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection<IgfsFile> list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List<IgfsFile> files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); - - secondaryFileSystem().mkdirs(toSecondary(f), perm); - } - else { - rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) - return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f))); - else { - IgfsFile info = rmtClient.info(convert(f)); - - if (info == null) - throw new FileNotFoundException("File not found: " + f); - - return convert(info); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException { - A.notNull(path, "path"); - - IgfsPath igfsPath = convert(path); - - enterBusy(); - - try { - if (modeRslvr.resolveMode(igfsPath) == PROXY) - return secondaryFileSystem().getFileBlockLocations(path, start, len); - else { - long now = System.currentTimeMillis(); - - List<IgfsBlockLocation> affinity = new ArrayList<>( - rmtClient.affinity(igfsPath, start, len)); - - BlockLocation[] arr = new BlockLocation[affinity.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); - - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return modeRslvr.resolveMode(convert(path)); - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, getUri()); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert factory != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - return new Path(IGFS_SCHEME, uriAuthority, path.toString()); - } - - /** - * Convert Hadoop path into IGFS path. - * - * @param path Hadoop path. - * @return IGFS path. - */ - @Nullable private IgfsPath convert(Path path) { - if (path == null) - return null; - - return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(workingDir, path.toUri().getPath()); - } - - /** - * Convert IGFS affinity block location into Hadoop affinity block location. - * - * @param block IGFS affinity block location. - * @return Hadoop affinity block location. - */ - private BlockLocation convert(IgfsBlockLocation block) { - Collection<String> names = block.names(); - Collection<String> hosts = block.hosts(); - - return new BlockLocation( - names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, - hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, - block.start(), block.length() - ) { - @Override public String toString() { - try { - return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + - ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** - * Convert IGFS file information into Hadoop file status. - * - * @param file IGFS file information. - * @return Hadoop file status. - */ - private FileStatus convert(IgfsFile file) { - return new FileStatus( - file.length(), - file.isDirectory(), - dfltReplication, - file.groupBlockSize(), - file.modificationTime(), - file.accessTime(), - permission(file), - file.property(IgfsUtils.PROP_USER_NAME, user), - file.property(IgfsUtils.PROP_GROUP_NAME, "users"), - convert(file.path())) { - @Override public String toString() { - return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]"; - } - }; - } - - /** - * Convert Hadoop permission into IGFS file attribute. - * - * @param perm Hadoop permission. - * @return IGFS attributes. - */ - private Map<String, String> permission(FsPermission perm) { - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm)); - } - - /** - * @param perm Permission. - * @return String. - */ - private static String toString(FsPermission perm) { - return String.format("%04o", perm.toShort()); - } - - /** - * Convert IGFS file attributes into Hadoop permission. - * - * @param file File info. - * @return Hadoop permission. - */ - private FsPermission permission(IgfsFile file) { - String perm = file.property(IgfsUtils.PROP_PERMISSION, null); - - if (perm == null) - return FsPermission.getDefault(); - - try { - return new FsPermission((short)Integer.parseInt(perm, 8)); - } - catch (NumberFormatException ignore) { - return FsPermission.getDefault(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteHadoopFileSystem.class, this); - } - - /** - * Returns the user name this File System is created on behalf of. - * @return the user name - */ - public String user() { - return user; - } - - /** - * Gets cached or creates a {@link FileSystem}. - * - * @return The secondary file system. - */ - private FileSystem secondaryFileSystem() throws IOException{ - assert factory != null; - - return factory.get(user); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java deleted file mode 100644 index d8e70d1..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation. - */ -package org.apache.ignite.hadoop.fs.v2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java deleted file mode 100644 index 583af35..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.mapreduce; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.protocol.ClientProtocol; -import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.client.GridClient; -import org.apache.ignite.internal.client.GridClientConfiguration; -import org.apache.ignite.internal.client.GridClientException; -import org.apache.ignite.internal.client.GridClientFactory; -import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller; -import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.F; - -import static org.apache.ignite.internal.client.GridClientProtocol.TCP; - - -/** - * Ignite Hadoop client protocol provider. - */ -public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { - /** Framework name used in configuration. */ - public static final String FRAMEWORK_NAME = "ignite"; - - /** Clients. */ - private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override public ClientProtocol create(Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { - String addr = conf.get(MRConfig.MASTER_ADDRESS); - - if (F.isEmpty(addr)) - throw new IOException("Failed to create client protocol because server address is not specified (is " + - MRConfig.MASTER_ADDRESS + " property set?)."); - - if (F.eq(addr, "local")) - throw new IOException("Local execution mode is not supported, please point " + - MRConfig.MASTER_ADDRESS + " to real Ignite node."); - - return createProtocol(addr, conf); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) - return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); - - return null; - } - - /** {@inheritDoc} */ - @Override public void close(ClientProtocol cliProto) throws IOException { - // No-op. - } - - /** - * Internal protocol creation routine. - * - * @param addr Address. - * @param conf Configuration. - * @return Client protocol. - * @throws IOException If failed. - */ - private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { - return new HadoopClientProtocol(conf, client(addr)); - } - - /** - * Create client. - * - * @param addr Endpoint address. - * @return Client. - * @throws IOException If failed. - */ - private static GridClient client(String addr) throws IOException { - try { - IgniteInternalFuture<GridClient> fut = cliMap.get(addr); - - if (fut == null) { - GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); - - IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); - - if (oldFut != null) - return oldFut.get(); - else { - GridClientConfiguration cliCfg = new GridClientConfiguration(); - - cliCfg.setProtocol(TCP); - cliCfg.setServers(Collections.singletonList(addr)); - cliCfg.setMarshaller(new GridClientJdkMarshaller()); - cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day. - cliCfg.setDaemon(true); - - try { - GridClient cli = GridClientFactory.start(cliCfg); - - fut0.onDone(cli); - - return cli; - } - catch (GridClientException e) { - fut0.onDone(e); - - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } - } - else - return fut.get(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java index d4a44fa..e1101c5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -17,16 +17,6 @@ package org.apache.ignite.hadoop.mapreduce; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -38,13 +28,23 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.UUID; + import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 27ffc19..2d1ac0b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -24,11 +24,11 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; @@ -116,7 +116,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc /** {@inheritDoc} */ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input()); + List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input()); int reducerCnt = job.info().reducers(); if (reducerCnt < 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java deleted file mode 100644 index 7635b9e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Ignite Hadoop Accelerator map-reduce classes. - */ -package org.apache.ignite.hadoop.mapreduce; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java index 26dc4b2..12669aa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java @@ -17,14 +17,12 @@ package org.apache.ignite.hadoop.util; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.jetbrains.annotations.Nullable; import java.io.Serializable; /** - * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} implementation to pass proper user names - * to the underlying Hadoop file system. + * Hadoop file system name mapper. Ensures that correct user name is passed to the underlying Hadoop file system. */ public interface UserNameMapper extends Serializable { /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java deleted file mode 100644 index 23eaa18..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Arrays; - -/** - * Hadoop attributes. - */ -public class HadoopAttributes implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Attribute name. */ - public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop"; - - /** Map-reduce planner class name. */ - private String plannerCls; - - /** External executor flag. */ - private boolean extExec; - - /** Maximum parallel tasks. */ - private int maxParallelTasks; - - /** Maximum task queue size. */ - private int maxTaskQueueSize; - - /** Library names. */ - @GridToStringExclude - private String[] libNames; - - /** Number of cores. */ - private int cores; - - /** - * Get attributes for node (if any). - * - * @param node Node. - * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node. - */ - @Nullable public static HadoopAttributes forNode(ClusterNode node) { - return node.attribute(NAME); - } - - /** - * {@link Externalizable} support. - */ - public HadoopAttributes() { - // No-op. - } - - /** - * Constructor. - * - * @param cfg Configuration. - */ - public HadoopAttributes(HadoopConfiguration cfg) { - assert cfg != null; - assert cfg.getMapReducePlanner() != null; - - plannerCls = cfg.getMapReducePlanner().getClass().getName(); - - // TODO: IGNITE-404: Get from configuration when fixed. - extExec = false; - - maxParallelTasks = cfg.getMaxParallelTasks(); - maxTaskQueueSize = cfg.getMaxTaskQueueSize(); - libNames = cfg.getNativeLibraryNames(); - - // Cores count already passed in other attributes, we add it here for convenience. - cores = Runtime.getRuntime().availableProcessors(); - } - - /** - * @return Map reduce planner class name. - */ - public String plannerClassName() { - return plannerCls; - } - - /** - * @return External execution flag. - */ - public boolean externalExecution() { - return extExec; - } - - /** - * @return Maximum parallel tasks. - */ - public int maxParallelTasks() { - return maxParallelTasks; - } - - /** - * @return Maximum task queue size. - */ - public int maxTaskQueueSize() { - return maxTaskQueueSize; - } - - - /** - * @return Native library names. - */ - public String[] nativeLibraryNames() { - return libNames; - } - - /** - * @return Number of cores on machine. - */ - public int cores() { - return cores; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(plannerCls); - out.writeBoolean(extExec); - out.writeInt(maxParallelTasks); - out.writeInt(maxTaskQueueSize); - out.writeObject(libNames); - out.writeInt(cores); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - plannerCls = (String)in.readObject(); - extExec = in.readBoolean(); - maxParallelTasks = in.readInt(); - maxTaskQueueSize = in.readInt(); - libNames = (String[])in.readObject(); - cores = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java new file mode 100644 index 0000000..83f94ce --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.TreeSet; + +/** + * Common Hadoop utility methods which do not depend on Hadoop API. + */ +public class HadoopCommonUtils { + /** + * Sort input splits by length. + * + * @param splits Splits. + * @return Sorted splits. + */ + public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) { + int id = 0; + + TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>(); + + for (HadoopInputSplit split : splits) { + long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0; + + sortedSplits.add(new SplitSortWrapper(id++, split, len)); + } + + ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size()); + + for (SplitSortWrapper sortedSplit : sortedSplits) + res.add(sortedSplit.split); + + return res; + } + + /** + * Split wrapper for sorting. + */ + private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { + /** Unique ID. */ + private final int id; + + /** Split. */ + private final HadoopInputSplit split; + + /** Split length. */ + private final long len; + + /** + * Constructor. + * + * @param id Unique ID. + * @param split Split. + * @param len Split length. + */ + public SplitSortWrapper(int id, HadoopInputSplit split, long len) { + this.id = id; + this.split = split; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public int compareTo(SplitSortWrapper other) { + long res = len - other.len; + + if (res > 0) + return -1; + else if (res < 0) + return 1; + else + return id - other.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id; + } + } + + /** + * Private constructor. + */ + private HadoopCommonUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java deleted file mode 100644 index aeda5c0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; - -/** - * Abstract class for all hadoop components. - */ -public abstract class HadoopComponent { - /** Hadoop context. */ - protected HadoopContext ctx; - - /** Logger. */ - protected IgniteLogger log; - - /** - * @param ctx Hadoop context. - */ - public void start(HadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(getClass()); - } - - /** - * Stops manager. - */ - public void stop(boolean cancel) { - // No-op. - } - - /** - * Callback invoked when all grid components are started. - */ - public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** - * Callback invoked before all grid components are stopped. - */ - public void onKernalStop(boolean cancel) { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java deleted file mode 100644 index 42a3d72..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter; -import org.apache.ignite.internal.util.typedef.internal.CU; - -/** - * Hadoop accelerator context. - */ -public class HadoopContext { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Hadoop configuration. */ - private HadoopConfiguration cfg; - - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** External task executor. */ - private HadoopTaskExecutorAdapter taskExecutor; - - /** */ - private HadoopShuffle shuffle; - - /** Managers list. */ - private List<HadoopComponent> components = new ArrayList<>(); - - /** - * @param ctx Kernal context. - */ - public HadoopContext( - GridKernalContext ctx, - HadoopConfiguration cfg, - HadoopJobTracker jobTracker, - HadoopTaskExecutorAdapter taskExecutor, - HadoopShuffle shuffle - ) { - this.ctx = ctx; - this.cfg = cfg; - - this.jobTracker = add(jobTracker); - this.taskExecutor = add(taskExecutor); - this.shuffle = add(shuffle); - } - - /** - * Gets list of managers. - * - * @return List of managers. - */ - public List<HadoopComponent> components() { - return components; - } - - /** - * Gets kernal context. - * - * @return Grid kernal context instance. - */ - public GridKernalContext kernalContext() { - return ctx; - } - - /** - * Gets Hadoop configuration. - * - * @return Hadoop configuration. - */ - public HadoopConfiguration configuration() { - return cfg; - } - - /** - * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. - * - * @return Local node ID. - */ - public UUID localNodeId() { - return ctx.localNodeId(); - } - - /** - * Gets local node order. - * - * @return Local node order. - */ - public long localNodeOrder() { - assert ctx.discovery() != null; - - return ctx.discovery().localNode().order(); - } - - /** - * @return Hadoop-enabled nodes. - */ - public Collection<ClusterNode> nodes() { - return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx()); - } - - /** - * @return {@code True} if - */ - public boolean jobUpdateLeader() { - long minOrder = Long.MAX_VALUE; - ClusterNode minOrderNode = null; - - for (ClusterNode node : nodes()) { - if (node.order() < minOrder) { - minOrder = node.order(); - minOrderNode = node; - } - } - - assert minOrderNode != null; - - return localNodeId().equals(minOrderNode.id()); - } - - /** - * @param meta Job metadata. - * @return {@code true} If local node is participating in job execution. - */ - public boolean isParticipating(HadoopJobMetadata meta) { - UUID locNodeId = localNodeId(); - - if (locNodeId.equals(meta.submitNodeId())) - return true; - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); - } - - /** - * @return Jon tracker instance. - */ - public HadoopJobTracker jobTracker() { - return jobTracker; - } - - /** - * @return Task executor. - */ - public HadoopTaskExecutorAdapter taskExecutor() { - return taskExecutor; - } - - /** - * @return Shuffle. - */ - public HadoopShuffle shuffle() { - return shuffle; - } - - /** - * @return Map-reduce planner. - */ - public HadoopMapReducePlanner planner() { - return cfg.getMapReducePlanner(); - } - - /** - * Adds component. - * - * @param c Component to add. - * @return Added manager. - */ - private <C extends HadoopComponent> C add(C c) { - components.add(c); - - return c; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java deleted file mode 100644 index ae17ac8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop job info based on default Hadoop configuration. - */ -public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { - /** */ - private static final long serialVersionUID = 5489900236464999951L; - - /** {@code true} If job has combiner. */ - private boolean hasCombiner; - - /** Number of reducers configured for job. */ - private int numReduces; - - /** Configuration. */ - private Map<String,String> props = new HashMap<>(); - - /** Job name. */ - private String jobName; - - /** User name. */ - private String user; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopDefaultJobInfo() { - // No-op. - } - - /** - * Constructor. - * - * @param jobName Job name. - * @param user User name. - * @param hasCombiner {@code true} If job has combiner. - * @param numReduces Number of reducers configured for job. - * @param props All other properties of the job. - */ - public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, - Map<String, String> props) { - this.jobName = jobName; - this.user = user; - this.hasCombiner = hasCombiner; - this.numReduces = numReduces; - this.props = props; - } - - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return props.get(name); - } - - /** {@inheritDoc} */ - @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { - assert jobCls != null; - - try { - Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, - HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class); - - return constructor.newInstance(jobId, this, log, libNames, helper); - } - catch (Throwable t) { - if (t instanceof Error) - throw (Error)t; - - throw new IgniteCheckedException(t); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - return hasCombiner; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - return reducers() > 0; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return numReduces; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - return jobName; - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, jobName); - U.writeString(out, user); - - out.writeBoolean(hasCombiner); - out.writeInt(numReduces); - - U.writeStringMap(out, props); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobName = U.readString(in); - user = U.readString(in); - - hasCombiner = in.readBoolean(); - numReduces = in.readInt(); - - props = U.readStringMap(in); - } - - /** - * @return Properties of the job. - */ - public Map<String, String> properties() { - return props; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java deleted file mode 100644 index ed2657e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop facade implementation. - */ -public class HadoopImpl implements Hadoop { - /** Hadoop processor. */ - private final HadoopProcessor proc; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** - * Constructor. - * - * @param proc Hadoop processor. - */ - HadoopImpl(HadoopProcessor proc) { - this.proc = proc; - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration configuration() { - return proc.config(); - } - - /** {@inheritDoc} */ - @Override public HadoopJobId nextJobId() { - if (busyLock.enterBusy()) { - try { - return proc.nextJobId(); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { - if (busyLock.enterBusy()) { - try { - return proc.submit(jobId, jobInfo); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to submit job (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.status(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job status (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.counters(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job counters (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.finishFuture(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.kill(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to kill job (grid is stopping)."); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java deleted file mode 100644 index 4e03e17..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Iterator; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.counters.CounterGroupBase; - -/** - * Hadoop +counter group adapter. - */ -class HadoopMapReduceCounterGroup implements CounterGroup { - /** Counters. */ - private final HadoopMapReduceCounters cntrs; - - /** Group name. */ - private final String name; - - /** - * Creates new instance. - * - * @param cntrs Client counters instance. - * @param name Group name. - */ - HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) { - this.cntrs = cntrs; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return name; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void addCounter(Counter counter) { - addCounter(counter.getName(), counter.getDisplayName(), 0); - } - - /** {@inheritDoc} */ - @Override public Counter addCounter(String name, String displayName, long value) { - final Counter counter = cntrs.findCounter(this.name, name); - - counter.setValue(value); - - return counter; - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, String displayName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, boolean create) { - return cntrs.findCounter(name, counterName, create); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public int size() { - return cntrs.groupSize(name); - } - - /** {@inheritDoc} */ - @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { - for (final Counter counter : rightGroup) - cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); - } - - /** {@inheritDoc} */ - @Override public CounterGroupBase<Counter> getUnderlyingGroup() { - return this; - } - - /** {@inheritDoc} */ - @Override public Iterator<Counter> iterator() { - return cntrs.iterateGroup(name); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} \ No newline at end of file