IGNITE-3859: IGFS: Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl This closes #1065. This closes #1083.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a35ee9d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a35ee9d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a35ee9d Branch: refs/heads/master Commit: 5a35ee9dad194b3009151b79f0ebd3976bb8fd22 Parents: 2474e2b Author: tledkov-gridgain <tled...@gridgain.com> Authored: Tue Sep 20 14:10:55 2016 +0500 Committer: tledkov-gridgain <tled...@gridgain.com> Committed: Tue Sep 20 14:10:55 2016 +0500 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsContext.java | 35 ++++++ .../processors/igfs/IgfsDataManager.java | 121 ++++++++----------- .../internal/processors/igfs/IgfsImpl.java | 82 ++++++++++--- .../processors/igfs/IgfsInputStreamImpl.java | 103 +++++++++++----- 4 files changed, 226 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java index 3e01246..3405b53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.igfs; import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.internal.GridKernalContext; @@ -60,6 +63,12 @@ public class IgfsContext { /** Local cluster node. */ private volatile ClusterNode locNode; + /** IGFS executor service. */ + private ExecutorService igfsSvc; + + /** Logger. */ + protected IgniteLogger log; + /** * @param ctx Kernal context. * @param cfg IGFS configuration. @@ -85,6 +94,10 @@ public class IgfsContext { this.srvMgr = add(srvMgr); this.fragmentizerMgr = add(fragmentizerMgr); + log = ctx.log(IgfsContext.class); + + igfsSvc = ctx.getIgfsExecutorService(); + igfs = new IgfsImpl(this); } @@ -206,6 +219,28 @@ public class IgfsContext { } /** + * Executes runnable in IGFS executor service. If execution rejected, runnable will be executed + * in caller thread. + * + * @param r Runnable to execute. + */ + public void runInIgfsThreadPool(Runnable r) { + try { + igfsSvc.submit(r); + } + catch (RejectedExecutionException ignored) { + // This exception will happen if network speed is too low and data comes faster + // than we can send it to remote nodes. + try { + r.run(); + } + catch (Exception e) { + log.warning("Failed to execute IGFS runnable: " + r, e); + } + } + } + + /** * Adds manager to managers list. * * @param mgr Manager. http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index d2183f9..2f704ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -74,12 +73,9 @@ import java.util.LinkedList; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -123,9 +119,6 @@ public class IgfsDataManager extends IgfsManager { /** Affinity key generator. */ private AtomicLong affKeyGen = new AtomicLong(); - /** IGFS executor service. */ - private ExecutorService igfsSvc; - /** Request ID counter for write messages. */ private AtomicLong reqIdCtr = new AtomicLong(); @@ -183,8 +176,6 @@ public class IgfsDataManager extends IgfsManager { } }, EVT_NODE_LEFT, EVT_NODE_FAILED); - igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService(); - delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(), "igfs-" + igfsName + "-delete-worker", log); } @@ -345,45 +336,11 @@ public class IgfsDataManager extends IgfsManager { if (oldRmtReadFut == null) { try { - if (log.isDebugEnabled()) - log.debug("Reading non-local data block in the secondary file system [path=" + - path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']'); - - int blockSize = fileInfo.blockSize(); - - long pos = blockIdx * blockSize; // Calculate position for Hadoop - - res = new byte[blockSize]; - - int read = 0; - - synchronized (secReader) { - try { - // Delegate to the secondary file system. - while (read < blockSize) { - int r = secReader.read(pos + read, res, read, blockSize - read); - - if (r < 0) - break; - - read += r; - } - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to read data due to secondary file system " + - "exception: " + e.getMessage(), e); - } - } - - // If we did not read full block at the end of the file - trim it. - if (read != blockSize) - res = Arrays.copyOf(res, read); + res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize()); rmtReadFut.onDone(res); putBlock(fileInfo.blockSize(), key, res); - - igfsCtx.metrics().addReadBlocks(1, 1); } catch (IgniteCheckedException e) { rmtReadFut.onDone(e); @@ -417,11 +374,59 @@ public class IgfsDataManager extends IgfsManager { } /** + * Get data block for specified block index from secondary reader. + * + * @param path Path reading from. + * @param blockIdx Block index. + * @param secReader Optional secondary file system reader. + * @param blockSize Block size. + * @return Requested data block or {@code null} if nothing found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx, + IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Reading non-local data block in the secondary file system [path=" + + path + ", blockIdx=" + blockIdx + ']'); + + long pos = blockIdx * blockSize; // Calculate position for Hadoop + + byte[] res = new byte[blockSize]; + + int read = 0; + + try { + // Delegate to the secondary file system. + while (read < blockSize) { + int r = secReader.read(pos + read, res, read, blockSize - read); + + if (r < 0) + break; + + read += r; + } + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to read data due to secondary file system " + + "exception: " + e.getMessage(), e); + } + + // If we did not read full block at the end of the file - trim it. + if (read != blockSize) + res = Arrays.copyOf(res, read); + + igfsCtx.metrics().addReadBlocks(1, 1); + + return res; + } + + /** * Stores the given block in data cache. * * @param blockSize The size of the block. * @param key The data cache key of the block. * @param data The new value of the block. + * @throws IgniteCheckedException If failed. */ private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException { if (data.length < blockSize) @@ -967,8 +972,8 @@ public class IgfsDataManager extends IgfsManager { } } else { - callIgfsLocalSafe(new GridPlainCallable<Object>() { - @Override @Nullable public Object call() throws Exception { + igfsCtx.runInIgfsThreadPool(new Runnable() { + @Override public void run() { storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { @@ -981,8 +986,6 @@ public class IgfsDataManager extends IgfsManager { } } }); - - return null; } }); } @@ -1070,28 +1073,6 @@ public class IgfsDataManager extends IgfsManager { } /** - * Executes callable in IGFS executor service. If execution rejected, callable will be executed - * in caller thread. - * - * @param c Callable to execute. - */ - private <T> void callIgfsLocalSafe(Callable<T> c) { - try { - igfsSvc.submit(c); - } - catch (RejectedExecutionException ignored) { - // This exception will happen if network speed is too low and data comes faster - // than we can send it to remote nodes. - try { - c.call(); - } - catch (Exception e) { - log.warning("Failed to execute IGFS callable: " + c, e); - } - } - } - - /** * @param blocks Blocks to write. * @return Future that will be completed after put is done. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 45596a3..87a4699 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -47,6 +47,7 @@ import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; @@ -949,34 +950,79 @@ public final class IgfsImpl implements IgfsEx { IgfsMode mode = resolveMode(path); - if (mode != PRIMARY) { - assert IgfsUtils.isDualMode(mode); + switch (mode) { + case PRIMARY: { + IgfsEntryInfo info = meta.infoForPath(path); - IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0); + if (info == null) + throw new IgfsPathNotFoundException("File not found: " + path); - IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(), - cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader()); + if (!info.isFile()) + throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); + // Input stream to read data from grid cache with separate blocks. + IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info, + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, + info.length(), info.blockSize(), info.blocksCount(), false); - return os; - } + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); - IgfsEntryInfo info = meta.infoForPath(path); + return os; + } - if (info == null) - throw new IgfsPathNotFoundException("File not found: " + path); + case DUAL_ASYNC: + case DUAL_SYNC: { + assert IgfsUtils.isDualMode(mode); - if (!info.isFile()) - throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); + IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0); + + IgfsEntryInfo info = desc.info(); - // Input stream to read data from grid cache with separate blocks. - IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info, - cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null); + IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info, + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), + info.length(), info.blockSize(), info.blocksCount(), false); + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); + + return os; + } - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); + case PROXY: { + assert secondaryFs != null; - return os; + IgfsFile info = info(path); + + if (info == null) + throw new IgfsPathNotFoundException("File not found: " + path); + + if (!info.isFile()) + throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); + + IgfsSecondaryFileSystemPositionedReadable secReader = + new IgfsLazySecondaryFileSystemPositionedReadable(secondaryFs, path, bufSize); + + long len = info.length(); + + int blockSize = info.blockSize() > 0 ? info.blockSize() : cfg.getBlockSize(); + + long blockCnt = len / blockSize; + + if (len % blockSize != 0) + blockCnt++; + + IgfsInputStream os = new IgfsInputStreamImpl(igfsCtx, path, null, + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, secReader, + info.length(), blockSize, blockCnt, true); + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); + + return os; + } + + default: + assert false : "Unexpected mode " + mode; + return null; + } } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java index 2f9f2fc..0d9f2cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -109,21 +110,44 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** Time consumed on reading. */ private long time; + /** File Length. */ + private long len; + + /** Block size to read. */ + private int blockSize; + + /** Block size to read. */ + private long blocksCnt; + + /** Proxy mode. */ + private boolean proxy; + /** * Constructs file output stream. - * - * @param igfsCtx IGFS context. + * @param igfsCtx IGFS context. * @param path Path to stored file. * @param fileInfo File info to write binary data to. * @param prefetchBlocks Number of blocks to prefetch. * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. * @param secReader Optional secondary file system reader. + * @param len File length. + * @param blockSize Block size. + * @param blocksCnt Blocks count. + * @param proxy Proxy mode flag. */ - IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks, - int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) { + IgfsInputStreamImpl( + IgfsContext igfsCtx, + IgfsPath path, + @Nullable IgfsEntryInfo fileInfo, + int prefetchBlocks, + int seqReadsBeforePrefetch, + @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, + long len, + int blockSize, + long blocksCnt, + boolean proxy) { assert igfsCtx != null; assert path != null; - assert fileInfo != null; this.igfsCtx = igfsCtx; this.path = path; @@ -131,6 +155,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar this.prefetchBlocks = prefetchBlocks; this.seqReadsBeforePrefetch = seqReadsBeforePrefetch; this.secReader = secReader; + this.len = len; + this.blockSize = blockSize; + this.blocksCnt = blocksCnt; + this.proxy = proxy; log = igfsCtx.kernalContext().log(IgfsInputStream.class); @@ -154,7 +182,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** {@inheritDoc} */ @Override public long length() { - return fileInfo.length(); + return len; } /** {@inheritDoc} */ @@ -195,7 +223,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** {@inheritDoc} */ @Override public synchronized int available() throws IOException { - long l = fileInfo.length() - pos; + long l = len - pos; if (l < 0) return 0; @@ -240,7 +268,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar @SuppressWarnings("IfMayBeConditional") public synchronized byte[][] readChunks(long pos, int len) throws IOException { // Readable bytes in the file, starting from the specified position. - long readable = fileInfo.length() - pos; + long readable = this.len - pos; if (readable <= 0) return EMPTY_CHUNKS; @@ -254,8 +282,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar bytes += len; - int start = (int)(pos / fileInfo.blockSize()); - int end = (int)((pos + len - 1) / fileInfo.blockSize()); + int start = (int)(pos / blockSize); + int end = (int)((pos + len - 1) / blockSize); int chunkCnt = end - start + 1; @@ -264,7 +292,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar for (int i = 0; i < chunkCnt; i++) { byte[] block = blockFragmentizerSafe(start + i); - int blockOff = (int)(pos % fileInfo.blockSize()); + int blockOff = (int)(pos % blockSize); int blockLen = Math.min(len, block.length - blockOff); // If whole block can be used as result, do not do array copy. @@ -366,7 +394,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar return 0; // Fully read done: read zero bytes correctly. // Readable bytes in the file, starting from the specified position. - long readable = fileInfo.length() - pos; + long readable = this.len - pos; if (readable <= 0) return -1; // EOF. @@ -378,10 +406,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar assert len > 0; - byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize()); + byte[] block = blockFragmentizerSafe(pos / blockSize); // Skip bytes to expected position. - int blockOff = (int)(pos % fileInfo.blockSize()); + int blockOff = (int)(pos % blockSize); len = Math.min(len, block.length - blockOff); @@ -412,7 +440,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']'); // This failure may be caused by file being fragmented. - if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { + if (fileInfo != null && fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id()); // File was deleted. @@ -459,7 +487,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar prevBlockIdx = blockIdx; - bytesFut = dataBlock(fileInfo, blockIdx); + bytesFut = dataBlock(blockIdx); assert bytesFut != null; @@ -470,10 +498,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) { for (int i = 1; i <= prefetchBlocks; i++) { // Ensure that we do not prefetch over file size. - if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length()) + if (blockSize * (i + blockIdx) >= len) break; else if (locCache.get(blockIdx + i) == null) - addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i)); + addLocalCacheFuture(blockIdx + i, dataBlock(blockIdx + i)); } } @@ -483,17 +511,17 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " + "[path=" + path + ", blockIdx=" + blockIdx + ']'); - int blockSize = fileInfo.blockSize(); + int blockSize0 = blockSize; - if (blockIdx == fileInfo.blocksCount() - 1) - blockSize = (int)(fileInfo.length() % blockSize); + if (blockIdx == blocksCnt - 1) + blockSize0 = (int)(len % blockSize0); // If part of the file was reserved for writing, but was not actually written. - if (bytes.length < blockSize) + if (bytes.length < blockSize0) throw new IOException("Inconsistent file's data block (incorrectly written?)" + " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length + - ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() + - ", fileLen=" + fileInfo.length() + ']'); + ", expectedBlockSize=" + blockSize0 + ", fileBlockSize=" + blockSize + + ", fileLen=" + len + ']'); return bytes; } @@ -538,14 +566,35 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** * Get data block for specified block index. * - * @param fileInfo File info. * @param blockIdx Block index. * @return Requested data block or {@code null} if nothing found. * @throws IgniteCheckedException If failed. */ - @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx) + @Nullable protected IgniteInternalFuture<byte[]> dataBlock(final long blockIdx) throws IgniteCheckedException { - return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader); + if (proxy) { + assert secReader != null; + + final GridFutureAdapter<byte[]> fut = new GridFutureAdapter<>(); + + igfsCtx.runInIgfsThreadPool(new Runnable() { + @Override public void run() { + try { + fut.onDone(igfsCtx.data().secondaryDataBlock(path, blockIdx, secReader, blockSize)); + } + catch (Throwable e) { + fut.onDone(null, e); + } + } + }); + + return fut; + } + else { + assert fileInfo != null; + + return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader); + } } /** {@inheritDoc} */