http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java deleted file mode 100644 index 9902142..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.igfs.common.IgfsControlResponse; -import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest; -import org.apache.ignite.internal.igfs.common.IgfsMessage; -import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest; -import org.apache.ignite.internal.igfs.common.IgfsStatusRequest; -import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.GridClosureException; -import org.apache.ignite.lang.IgniteClosure; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; - -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE; -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK; - -/** - * Communication with external process (TCP or shmem). - */ -public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener { - /** Expected result is boolean. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); - - /** Expected result is boolean. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); - - /** Expected result is {@code IgfsHandshakeResponse} */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, - IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); - - /** Expected result is {@code IgfsStatus} */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES = - createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, - IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, - Collection<IgfsFile>> FILE_COL_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, - Collection<IgfsPath>> PATH_COL_RES = createClosure(); - - /** Expected result is {@code IgfsPathSummary}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = - createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, - Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); - - /** Grid name. */ - private final String grid; - - /** IGFS name. */ - private final String igfs; - - /** The user this out proc is performing on behalf of. */ - private final String userName; - - /** Client log. */ - private final Log log; - - /** Client IO. */ - private final HadoopIgfsIpcIo io; - - /** Event listeners. */ - private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>(); - - /** - * Constructor for TCP endpoint. - * - * @param host Host. - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param log Client logger. - * @throws IOException If failed. - */ - public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { - this(host, port, grid, igfs, false, log, user); - } - - /** - * Constructor for shmem endpoint. - * - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param log Client logger. - * @throws IOException If failed. - */ - public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { - this(null, port, grid, igfs, true, log, user); - } - - /** - * Constructor. - * - * @param host Host. - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param shmem Shared memory flag. - * @param log Client logger. - * @throws IOException If failed. - */ - private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) - throws IOException { - assert host != null && !shmem || host == null && shmem : - "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; - - String endpoint = host != null ? host + ":" + port : "shmem:" + port; - - this.grid = grid; - this.igfs = igfs; - this.log = log; - this.userName = IgfsUtils.fixUserName(user); - - io = HadoopIgfsIpcIo.get(log, endpoint); - - io.addEventListener(this); - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { - final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); - - req.gridName(grid); - req.igfsName(igfs); - req.logDirectory(logDir); - - return io.send(req).chain(HANDSHAKE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - assert io != null; - - io.removeEventListener(this); - - if (force) - io.forceClose(); - else - io.release(); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(INFO); - msg.path(path); - msg.userName(userName); - - return io.send(msg).chain(FILE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(UPDATE); - msg.path(path); - msg.properties(props); - msg.userName(userName); - - return io.send(msg).chain(FILE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(SET_TIMES); - msg.path(path); - msg.accessTime(accessTime); - msg.modificationTime(modificationTime); - msg.userName(userName); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(RENAME); - msg.path(src); - msg.destinationPath(dest); - msg.userName(userName); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(DELETE); - msg.path(path); - msg.flag(recursive); - msg.userName(userName); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(AFFINITY); - msg.path(path); - msg.start(start); - msg.length(len); - msg.userName(userName); - - return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(PATH_SUMMARY); - msg.path(path); - msg.userName(userName); - - return io.send(msg).chain(SUMMARY_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(MAKE_DIRECTORIES); - msg.path(path); - msg.properties(props); - msg.userName(userName); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(LIST_FILES); - msg.path(path); - msg.userName(userName); - - return io.send(msg).chain(FILE_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(LIST_PATHS); - msg.path(path); - msg.userName(userName); - - return io.send(msg).chain(PATH_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_READ); - msg.path(path); - msg.flag(false); - msg.userName(userName); - - IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); - - return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, - int seqReadsBeforePrefetch) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_READ); - msg.path(path); - msg.flag(true); - msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); - msg.userName(userName); - - IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); - - return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_CREATE); - msg.path(path); - msg.flag(overwrite); - msg.colocate(colocate); - msg.properties(props); - msg.replication(replication); - msg.blockSize(blockSize); - msg.userName(userName); - - Long streamId = io.send(msg).chain(LONG_RES).get(); - - return new HadoopIgfsStreamDelegate(this, streamId); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_APPEND); - msg.path(path); - msg.flag(create); - msg.properties(props); - msg.userName(userName); - - Long streamId = io.send(msg).chain(LONG_RES).get(); - - return new HadoopIgfsStreamDelegate(this, streamId); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len, - final @Nullable byte[] outBuf, final int outOff, final int outLen) { - assert len > 0; - - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(READ_BLOCK); - msg.streamId((long) desc.target()); - msg.position(pos); - msg.length(len); - - try { - return io.send(msg, outBuf, outOff, outLen); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len) - throws IOException { - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(WRITE_BLOCK); - msg.streamId((long) desc.target()); - msg.data(data); - msg.position(off); - msg.length(len); - - try { - io.sendPlain(msg); - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - } - - /** {@inheritDoc} */ - @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(CLOSE); - msg.streamId((long)desc.target()); - - try { - io.send(msg).chain(BOOL_RES).get(); - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(HadoopIgfsStreamDelegate desc, - HadoopIgfsStreamEventListener lsnr) { - long streamId = desc.target(); - - HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [streamId=" + streamId + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) { - long streamId = desc.target(); - - HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [streamId=" + streamId + ']'); - } - - /** {@inheritDoc} */ - @Override public void onClose() { - for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); - } - } - } - - /** {@inheritDoc} */ - @Override public void onError(long streamId, String errMsg) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId); - - if (lsnr != null) - lsnr.onError(errMsg); - else - log.warn("Received write error response for not registered output stream (will ignore) " + - "[streamId= " + streamId + ']'); - } - - /** - * Creates conversion closure for given type. - * - * @param <T> Type of expected result. - * @return Conversion closure. - */ - @SuppressWarnings("unchecked") - private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() { - return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() { - @Override public T apply(IgniteInternalFuture<IgfsMessage> fut) { - try { - IgfsControlResponse res = (IgfsControlResponse)fut.get(); - - if (res.hasError()) - res.throwError(); - - return (T)res.response(); - } - catch (IgfsException | IgniteCheckedException e) { - throw new GridClosureException(e); - } - } - }; - } - - /** {@inheritDoc} */ - @Override public String user() { - return userName; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java deleted file mode 100644 index 8f7458b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.io.OutputStream; -import org.apache.commons.logging.Log; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.jetbrains.annotations.NotNull; - -/** - * IGFS Hadoop output stream implementation. - */ -public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener { - /** Log instance. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Log stream ID. */ - private long logStreamId; - - /** Server stream delegate. */ - private HadoopIgfsStreamDelegate delegate; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Error message. */ - private volatile String errMsg; - - /** Read time. */ - private long writeTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of written bytes. */ - private long total; - - /** - * Creates light output stream. - * - * @param delegate Server stream delegate. - * @param log Logger to use. - * @param clientLog Client logger. - */ - public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log, - IgfsLogger clientLog, long logStreamId) { - this.delegate = delegate; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void writeStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void writeEnd() { - long now = System.nanoTime(); - - writeTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { - check(); - - writeStart(); - - try { - delegate.hadoop().writeData(delegate, b, off, len); - - total += len; - } - finally { - writeEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - write(new byte[] {(byte)b}); - - total++; - } - - /** {@inheritDoc} */ - @Override public void flush() throws IOException { - delegate.hadoop().flush(delegate); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (!closed) { - if (log.isDebugEnabled()) - log.debug("Closing output stream: " + delegate); - - writeStart(); - - delegate.hadoop().closeStream(delegate); - - markClosed(false); - - writeEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseOut(logStreamId, userTime, writeTime, total); - - if (log.isDebugEnabled()) - log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + - ", userTime=" + userTime / 1000 + ']'); - } - else if(connBroken) - throw new IOException( - "Failed to close stream, because connection was broken (data could have been lost)."); - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - delegate.hadoop().removeEventListener(delegate); - - this.connBroken = connBroken; - } - } - - /** - * @throws IOException If check failed. - */ - private void check() throws IOException { - String errMsg0 = errMsg; - - if (errMsg0 != null) - throw new IOException(errMsg0); - - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** {@inheritDoc} */ - @Override public void onClose() throws IgniteCheckedException { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - this.errMsg = errMsg; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java deleted file mode 100644 index 90f6bca..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.util.Map; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; - -/** - * Hadoop file system properties. - */ -public class HadoopIgfsProperties { - /** Username. */ - private String usrName; - - /** Group name. */ - private String grpName; - - /** Permissions. */ - private FsPermission perm; - - /** - * Constructor. - * - * @param props Properties. - * @throws IgniteException In case of error. - */ - public HadoopIgfsProperties(Map<String, String> props) throws IgniteException { - usrName = props.get(IgfsUtils.PROP_USER_NAME); - grpName = props.get(IgfsUtils.PROP_GROUP_NAME); - - String permStr = props.get(IgfsUtils.PROP_PERMISSION); - - if (permStr != null) { - try { - perm = new FsPermission((short)Integer.parseInt(permStr, 8)); - } - catch (NumberFormatException ignore) { - throw new IgniteException("Permissions cannot be parsed: " + permStr); - } - } - } - - /** - * Get user name. - * - * @return User name. - */ - public String userName() { - return usrName; - } - - /** - * Get group name. - * - * @return Group name. - */ - public String groupName() { - return grpName; - } - - /** - * Get permission. - * - * @return Permission. - */ - public FsPermission permission() { - return perm; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java deleted file mode 100644 index 5cee947..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.io.InputStream; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; -import org.apache.ignite.internal.igfs.common.IgfsLogger; - -/** - * Secondary Hadoop file system input stream wrapper. - */ -public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable { - /** Actual input stream to the secondary file system. */ - private final FSDataInputStream is; - - /** Client logger. */ - private final IgfsLogger clientLog; - - /** Log stream ID. */ - private final long logStreamId; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** Closed flag. */ - private boolean closed; - - /** - * Constructor. - * - * @param is Actual input stream to the secondary file system. - * @param clientLog Client log. - */ - public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { - assert is != null; - assert clientLog != null; - - this.is = is; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b) throws IOException { - readStart(); - - int res; - - try { - res = is.read(b); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - readStart(); - - int res; - - try { - res = super.read(b, off, len); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - readStart(); - - long res; - - try { - res = is.skip(n); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, res); - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - readStart(); - - try { - return is.available(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - closed = true; - - readStart(); - - try { - is.close(); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - readStart(); - - try { - is.mark(readLimit); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - readStart(); - - try { - is.reset(); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - } - - /** {@inheritDoc} */ - @Override public synchronized boolean markSupported() { - readStart(); - - try { - return is.markSupported(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - readStart(); - - int res; - - try { - res = is.read(); - } - finally { - readEnd(); - } - - if (res != -1) - total++; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - readStart(); - - int res; - - try { - res = is.read(pos, buf, off, len); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, res); - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { - readStart(); - - try { - is.readFully(pos, buf, off, len); - } - finally { - readEnd(); - } - - total += len; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, len); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { - readStart(); - - try { - is.readFully(pos, buf); - } - finally { - readEnd(); - } - - total += buf.length; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - readStart(); - - try { - is.seek(pos); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() throws IOException { - readStart(); - - try { - return is.getPos(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { - readStart(); - - try { - return is.seekToNewSource(targetPos); - } - finally { - readEnd(); - } - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java deleted file mode 100644 index eade0f0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.io.OutputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.ignite.internal.igfs.common.IgfsLogger; - -/** - * Secondary Hadoop file system output stream wrapper. - */ -public class HadoopIgfsProxyOutputStream extends OutputStream { - /** Actual output stream. */ - private FSDataOutputStream os; - - /** Client logger. */ - private final IgfsLogger clientLog; - - /** Log stream ID. */ - private final long logStreamId; - - /** Read time. */ - private long writeTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of written bytes. */ - private long total; - - /** Closed flag. */ - private boolean closed; - - /** - * Constructor. - * - * @param os Actual output stream. - * @param clientLog Client logger. - * @param logStreamId Log stream ID. - */ - public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { - assert os != null; - assert clientLog != null; - - this.os = os; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(int b) throws IOException { - writeStart(); - - try { - os.write(b); - } - finally { - writeEnd(); - } - - total++; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b) throws IOException { - writeStart(); - - try { - os.write(b); - } - finally { - writeEnd(); - } - - total += b.length; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b, int off, int len) throws IOException { - writeStart(); - - try { - os.write(b, off, len); - } - finally { - writeEnd(); - } - - total += len; - } - - /** {@inheritDoc} */ - @Override public synchronized void flush() throws IOException { - writeStart(); - - try { - os.flush(); - } - finally { - writeEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - closed = true; - - writeStart(); - - try { - os.close(); - } - finally { - writeEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logCloseOut(logStreamId, userTime, writeTime, total); - } - } - - /** - * Read start. - */ - private void writeStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void writeEnd() { - long now = System.nanoTime(); - - writeTime += now - lastTs; - - lastTs = now; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java deleted file mode 100644 index a0577ce..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly - * requested. - * <p> - * The class is expected to be used only from synchronized context and therefore is not tread-safe. - */ -public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable { - /** Secondary file system. */ - private final FileSystem fs; - - /** Path to the file to open. */ - private final Path path; - - /** Buffer size. */ - private final int bufSize; - - /** Actual input stream. */ - private FSDataInputStream in; - - /** Cached error occurred during output stream open. */ - private IOException err; - - /** Flag indicating that the stream was already opened. */ - private boolean opened; - - /** - * Constructor. - * - * @param fs Secondary file system. - * @param path Path to the file to open. - * @param bufSize Buffer size. - */ - public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) { - assert fs != null; - assert path != null; - - this.fs = fs; - this.path = path; - this.bufSize = bufSize; - } - - /** Get input stream. */ - private PositionedReadable in() throws IOException { - if (opened) { - if (err != null) - throw err; - } - else { - opened = true; - - try { - in = fs.open(path, bufSize); - - if (in == null) - throw new IOException("Failed to open input stream (file system returned null): " + path); - } - catch (IOException e) { - err = e; - - throw err; - } - } - - return in; - } - - /** - * Close wrapped input stream in case it was previously opened. - */ - @Override public void close() { - U.closeQuiet(in); - } - - /** {@inheritDoc} */ - @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { - return in().read(pos, buf, off, len); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java deleted file mode 100644 index 37b58ab..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * IGFS Hadoop stream descriptor. - */ -public class HadoopIgfsStreamDelegate { - /** RPC handler. */ - private final HadoopIgfsEx hadoop; - - /** Target. */ - private final Object target; - - /** Optional stream length. */ - private final long len; - - /** - * Constructor. - * - * @param target Target. - */ - public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) { - this(hadoop, target, -1); - } - - /** - * Constructor. - * - * @param target Target. - * @param len Optional length. - */ - public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) { - assert hadoop != null; - assert target != null; - - this.hadoop = hadoop; - this.target = target; - this.len = len; - } - - /** - * @return RPC handler. - */ - public HadoopIgfsEx hadoop() { - return hadoop; - } - - /** - * @return Stream target. - */ - @SuppressWarnings("unchecked") - public <T> T target() { - return (T) target; - } - - /** - * @return Length. - */ - public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return System.identityHashCode(target); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof HadoopIgfsStreamDelegate && - target == ((HadoopIgfsStreamDelegate)obj).target; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopIgfsStreamDelegate.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java deleted file mode 100644 index d81f765..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import org.apache.ignite.IgniteCheckedException; - -/** - * IGFS input stream event listener. - */ -public interface HadoopIgfsStreamEventListener { - /** - * Callback invoked when the stream is being closed. - * - * @throws IgniteCheckedException If failed. - */ - public void onClose() throws IgniteCheckedException; - - /** - * Callback invoked when remote error occurs. - * - * @param errMsg Error message. - */ - public void onError(String errMsg); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java deleted file mode 100644 index fa5cbc5..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.FileNotFoundException; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; -import org.apache.ignite.igfs.IgfsParentNotDirectoryException; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; -import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.jetbrains.annotations.Nullable; - -/** - * Utility constants and methods for IGFS Hadoop file system. - */ -public class HadoopIgfsUtils { - /** Parameter name for endpoint no embed mode flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; - - /** Parameter name for endpoint no shared memory flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; - - /** Parameter name for endpoint no local TCP flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; - - /** - * Get string parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return String value. - */ - public static String parameter(Configuration cfg, String name, String authority, String dflt) { - return cfg.get(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Get integer parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Integer value. - * @throws IOException In case of parse exception. - */ - public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { - String name0 = String.format(name, authority != null ? authority : ""); - - try { - return cfg.getInt(name0, dflt); - } - catch (NumberFormatException ignore) { - throw new IOException("Failed to parse parameter value to integer: " + name0); - } - } - - /** - * Get boolean parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Boolean value. - */ - public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { - return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @return Casted exception. - */ - public static IOException cast(IgniteCheckedException e) { - return cast(e, null); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @param path Path for exceptions. - * @return Casted exception. - */ - @SuppressWarnings("unchecked") - public static IOException cast(IgniteCheckedException e, @Nullable String path) { - assert e != null; - - // First check for any nested IOException; if exists - re-throw it. - if (e.hasCause(IOException.class)) - return e.getCause(IOException.class); - else if (e.hasCause(IgfsPathNotFoundException.class)) - return new FileNotFoundException(path); // TODO: Or PathNotFoundException? - else if (e.hasCause(IgfsParentNotDirectoryException.class)) - return new ParentNotDirectoryException(path); - else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) - return new PathIsNotEmptyDirectoryException(path); - else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) - return new PathExistsException(path); - else { - String msg = e.getMessage(); - - return msg == null ? new IOException(e) : new IOException(msg, e); - } - } - - /** - * Deletes all files from the given file system. - * - * @param fs The file system to clean up. - * @throws IOException On error. - */ - public static void clear(FileSystem fs) throws IOException { - // Delete root contents: - FileStatus[] statuses = fs.listStatus(new Path("/")); - - if (statuses != null) { - for (FileStatus stat: statuses) - fs.delete(stat.getPath(), true); - } - } - - /** - * Deletes all files from the given file system. - * - * @param fs The file system to clean up. - * @throws IOException On error. - */ - public static void clear(AbstractFileSystem fs) throws IOException { - // Delete root contents: - FileStatus[] statuses = fs.listStatus(new Path("/")); - - if (statuses != null) { - for (FileStatus stat: statuses) - fs.delete(stat.getPath(), true); - } - } - - /** - * Constructor. - */ - private HadoopIgfsUtils() { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java deleted file mode 100644 index f4ee97f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.IgniteIllegalStateException; -import org.apache.ignite.Ignition; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.IgniteState.STARTED; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; - -/** - * Wrapper for IGFS server. - */ -public class HadoopIgfsWrapper implements HadoopIgfs { - /** Delegate. */ - private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); - - /** Authority. */ - private final String authority; - - /** Connection string. */ - private final HadoopIgfsEndpoint endpoint; - - /** Log directory. */ - private final String logDir; - - /** Configuration. */ - private final Configuration conf; - - /** Logger. */ - private final Log log; - - /** The user name this wrapper works on behalf of. */ - private final String userName; - - /** - * Constructor. - * - * @param authority Authority (connection string). - * @param logDir Log directory for server. - * @param conf Configuration. - * @param log Current logger. - */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) - throws IOException { - try { - this.authority = authority; - this.endpoint = new HadoopIgfsEndpoint(authority); - this.logDir = logDir; - this.conf = conf; - this.log = log; - this.userName = user; - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to parse endpoint: " + authority, e); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { - @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) { - return hndResp; - } - }); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - Delegate delegate = delegateRef.get(); - - if (delegate != null && delegateRef.compareAndSet(delegate, null)) - delegate.close(force); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.info(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.update(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) - throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.setTimes(path, accessTime, modificationTime); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.rename(src, dest); - } - }, src); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.delete(path, recursive); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, - final long len) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { - @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.affinity(path, start, len); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { - @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.contentSummary(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.mkdirs(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { - @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listFiles(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { - @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listPaths(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { - @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.fsStatus(); - } - }); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) - throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path, seqReadsBeforePrefetch); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, - final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) - throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.create(path, overwrite, colocate, replication, blockSize, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, - @Nullable final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.append(path, create, props); - } - }, path); - } - - /** - * Execute closure which is not path-specific. - * - * @param clo Closure. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { - return withReconnectHandling(clo, null); - } - - /** - * Execute closure. - * - * @param clo Closure. - * @param path Path for exceptions. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) - throws IOException { - Exception err = null; - - for (int i = 0; i < 2; i++) { - Delegate curDelegate = null; - - boolean close = false; - boolean force = false; - - try { - curDelegate = delegate(); - - assert curDelegate != null; - - close = curDelegate.doomed; - - return clo.apply(curDelegate.hadoop, curDelegate.hndResp); - } - catch (HadoopIgfsCommunicationException e) { - if (curDelegate != null && !curDelegate.doomed) { - // Try getting rid fo faulty delegate ASAP. - delegateRef.compareAndSet(curDelegate, null); - - close = true; - force = true; - } - - if (log.isDebugEnabled()) - log.debug("Failed to send message to a server: " + e); - - err = e; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); - } - finally { - if (close) { - assert curDelegate != null; - - curDelegate.close(force); - } - } - } - - List<Throwable> list = X.getThrowableList(err); - - Throwable cause = list.get(list.size() - 1); - - throw new IOException("Failed to communicate with IGFS: " - + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err); - } - - /** - * Get delegate creating it if needed. - * - * @return Delegate. - */ - private Delegate delegate() throws HadoopIgfsCommunicationException { - // These fields will contain possible exceptions from shmem and TCP endpoints. - Exception errShmem = null; - Exception errTcp = null; - - // 1. If delegate is set, return it immediately. - Delegate curDelegate = delegateRef.get(); - - if (curDelegate != null) - return curDelegate; - - // 2. Guess that we are in the same VM. - boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false); - - if (!skipInProc) { - IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs()); - - if (igfs != null) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsInProc(igfs, log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - if (hadoop != null) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); - } - } - } - - // 3. Try connecting using shmem. - boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); - - if (curDelegate == null && !skipLocShmem && !U.isWindows()) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); - - errShmem = e; - } - } - - // 4. Try local TCP connection. - boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - - if (curDelegate == null && !skipLocTcp) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + - ", port=" + endpoint.port() + ']', e); - - errTcp = e; - } - } - - // 5. Try remote TCP connection. - if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), - log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + - ", port=" + endpoint.port() + ']', e); - - errTcp = e; - } - } - - if (curDelegate != null) { - if (!delegateRef.compareAndSet(null, curDelegate)) - curDelegate.doomed = true; - - return curDelegate; - } - else { - SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=["); - - if (errShmem != null) - errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); - - errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); - - errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + - "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); - - throw new HadoopIgfsCommunicationException(errMsg.toString()); - } - } - - /** - * File system operation closure. - */ - private static interface FileSystemClosure<T> { - /** - * Call closure body. - * - * @param hadoop RPC handler. - * @param hndResp Handshake response. - * @return Result. - * @throws IgniteCheckedException If failed. - * @throws IOException If failed. - */ - public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; - } - - /** - * Delegate. - */ - private static class Delegate { - /** RPC handler. */ - private final HadoopIgfsEx hadoop; - - /** Handshake request. */ - private final IgfsHandshakeResponse hndResp; - - /** Close guard. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Whether this delegate must be closed at the end of the next invocation. */ - private boolean doomed; - - /** - * Constructor. - * - * @param hadoop Hadoop. - * @param hndResp Handshake response. - */ - private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { - this.hadoop = hadoop; - this.hndResp = hndResp; - } - - /** - * Close underlying RPC handler. - * - * @param force Force flag. - */ - private void close(boolean force) { - if (closeGuard.compareAndSet(false, true)) - hadoop.close(force); - } - } - - /** - * Helper method to find Igfs of the given name in the given Ignite instance. - * - * @param gridName The name of the grid to check. - * @param igfsName The name of Igfs. - * @return The file system instance, or null if not found. - */ - private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) { - if (Ignition.state(gridName) == STARTED) { - try { - for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) { - if (F.eq(fs.name(), igfsName)) - return (IgfsEx)fs; - } - } - catch (IgniteIllegalStateException ignore) { - // May happen if the grid state has changed: - } - } - - return null; - } -} \ No newline at end of file