IGNITE-3260: IGFS: Delete messages are no longer passed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065d2e70 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065d2e70 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065d2e70 Branch: refs/heads/ignite-3038 Commit: 065d2e70c21418437eba5e725eaa8b1ebc3af6da Parents: 0176af1 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Mon Jun 6 18:12:42 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Mon Jun 6 18:12:42 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsAsyncImpl.java | 6 - .../processors/igfs/IgfsDataManager.java | 61 ++--- .../processors/igfs/IgfsDeleteWorker.java | 42 ---- .../ignite/internal/processors/igfs/IgfsEx.java | 9 - .../internal/processors/igfs/IgfsImpl.java | 249 +++++-------------- .../internal/processors/igfs/IgfsUtils.java | 2 +- .../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 - .../processors/igfs/IgfsSizeSelfTest.java | 133 ---------- .../HadoopDefaultMapReducePlannerSelfTest.java | 6 - 9 files changed, 83 insertions(+), 427 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 8653f90..7530557 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.AsyncSupportAdapter; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { - return igfs.awaitDeletesAsync(); - } - - /** {@inheritDoc} */ @Nullable @Override public String clientLogDirectory() { return igfs.clientLogDirectory(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 16fbeb8..57a8c6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager { private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff, byte[] data) throws IgniteCheckedException { if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - try { - igfs.awaitDeletesAsync().get(trashPurgeTimeout); - } - catch (IgniteFutureTimeoutCheckedException ignore) { - // Ignore. - } + final WriteCompletionFuture completionFut = pendingWrites.get(fileId); - // Additional size check. - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - final WriteCompletionFuture completionFut = pendingWrites.get(fileId); - - if (completionFut == null) { - if (log.isDebugEnabled()) - log.debug("Missing completion future for file write request (most likely exception occurred " + - "which will be thrown upon stream close) [fileId=" + fileId + ']'); + if (completionFut == null) { + if (log.isDebugEnabled()) + log.debug("Missing completion future for file write request (most likely exception occurred " + + "which will be thrown upon stream close) [fileId=" + fileId + ']'); - return; - } + return; + } - IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " + - "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + - ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'); + IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " + + "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + + ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'); - completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + - igfsCtx.kernalContext().localNodeId(), e)); + completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + + igfsCtx.kernalContext().localNodeId(), e)); - return; - } + return; } // No affinity key present, just concat and return. @@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager { assert !blocks.isEmpty(); if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - try { - try { - igfs.awaitDeletesAsync().get(trashPurgeTimeout); - } - catch (IgniteFutureTimeoutCheckedException ignore) { - // Ignore. - } - - // Additional size check. - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) - return new GridFinishedFuture<Object>( - new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " + - "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + - ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']')); - - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " + - "block due to unexpected exception.", e)); - } + return new GridFinishedFuture<Object>( + new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " + + "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + + ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']')); } return dataCachePrj.putAllAsync(blocks); http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index bae9354..310090d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.future.GridCompoundFuture; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; - /** * IGFS worker for removal from the trash directory. */ @@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread { /** How many files/folders to delete at once (i.e in a single transaction). */ private static final int MAX_DELETE_BATCH = 100; - /** IGFS context. */ - private final IgfsContext igfsCtx; - /** Metadata manager. */ private final IgfsMetaManager meta; @@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread { /** Cancellation flag. */ private volatile boolean cancelled; - /** Message topic. */ - private Object topic; - /** * Constructor. * @@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread { IgfsDeleteWorker(IgfsContext igfsCtx) { super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%"); - this.igfsCtx = igfsCtx; - meta = igfsCtx.meta(); data = igfsCtx.data(); - String igfsName = igfsCtx.igfs().name(); - - topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); - assert meta != null; assert data != null; @@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread { if (log.isDebugEnabled()) log.debug("Sending delete confirmation message [name=" + entry.getKey() + ", fileId=" + fileId + ']'); - - sendDeleteMessage(new IgfsDeleteMessage(fileId)); } } else @@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread { } catch (IgniteCheckedException e) { U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e); - - sendDeleteMessage(new IgfsDeleteMessage(fileId, e)); } } } @@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread { return true; // Directory entry was deleted concurrently. } } - - /** - * Send delete message to all meta cache nodes in the grid. - * - * @param msg Message to send. - */ - private void sendDeleteMessage(IgfsDeleteMessage msg) { - assert msg != null; - - Collection<ClusterNode> nodes = meta.metaCacheNodes(); - - for (ClusterNode node : nodes) { - try { - igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() + - ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index fb67e20..4c64bc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem { public long groupBlockSize(); /** - * Asynchronously await for all entries existing in trash to be removed. - * - * @return Future which will be completed when all entries existed in trash by the time of invocation are removed. - * @throws IgniteCheckedException If failed. - */ - public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException; - - /** * Gets client file system log directory. * * @return Client file system log directory or {@code null} in case no client connections have been created yet. http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 9087ff0..262dfef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTaskSplitAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsInvalidPathException; import org.apache.ignite.igfs.IgfsMetrics; @@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable; import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable; @@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.future.GridCompoundFuture; -import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -100,11 +95,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; @@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.igfs.IgfsMode.PROXY; -import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS; /** * Cache-based IGFS implementation. @@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx { /** Default permissions for file system entry. */ private static final String PERMISSION_DFLT_VAL = "0777"; + /** Index generator for async format threads. */ + private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger(); + /** Default directory metadata. */ static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL); @@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx { /** Writers map. */ private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>(); - /** Delete futures. */ - private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>(); - - /** Delete message listener. */ - private final GridMessageListener delMsgLsnr = new FormatMessageListener(); - - /** Format discovery listener. */ - private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener(); - /** Local metrics holder. */ private final IgfsLocalMetrics metrics = new IgfsLocalMetrics(); /** Client log directory. */ private volatile String logDir; - /** Message topic. */ - private Object topic; - /** Eviction policy (if set). */ private IgfsPerBlockLruEvictionPolicy evictPlc; @@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx { } } - topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name()); - - igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); - igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; } @@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx { } } - igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr); - igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr); - // Restore interrupted flag. if (interrupted) Thread.currentThread().interrupt(); @@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx { /** {@inheritDoc} */ @Override public void format() { try { - formatAsync().get(); + IgniteUuid id = meta.format(); + + // If ID is null, then file system is already empty. + if (id == null) + return; + + while (true) { + if (enterBusy()) { + try { + if (!meta.exists(id)) + return; + } + finally { + busyLock.leaveBusy(); + } + } + + U.sleep(10); + } } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx { * @return Future. */ IgniteInternalFuture<?> formatAsync() { - try { - IgniteUuid id = meta.format(); - - if (id == null) - return new GridFinishedFuture<Object>(); - else { - GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); - - GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut); - - if (oldFut != null) - return oldFut; - else { - if (!meta.exists(id)) { - // Safety in case response message was received before we put future into collection. - fut.onDone(); - - delFuts.remove(id, fut); - } + GridFutureAdapter<?> fut = new GridFutureAdapter<>(); - return fut; - } - } - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<Object>(e); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { - Collection<IgniteUuid> ids = meta.pendingDeletes(); + Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" + + FORMAT_THREAD_IDX_GEN.incrementAndGet()); - if (!ids.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Constructing delete future for trash entries: " + ids); + t.setDaemon(true); - GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>(); + t.start(); - for (IgniteUuid id : ids) { - GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); - - IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut); - - if (oldFut != null) - resFut.add(oldFut); - else { - if (meta.exists(id)) - resFut.add(fut); - else { - fut.onDone(); - - delFuts.remove(id, fut); - } - } - } - - resFut.markInitialized(); - - return resFut; - } - else - return new GridFinishedFuture<>(); + return fut; } /** @@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx { return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile()); } - /** - * Check whether IGFS with the same name exists among provided attributes. - * - * @param attrs Attributes. - * @return {@code True} in case IGFS with the same name exists among provided attributes - */ - private boolean sameIgfs(IgfsAttributes[] attrs) { - if (attrs != null) { - String igfsName = name(); - - for (IgfsAttributes attr : attrs) { - if (F.eq(igfsName, attr.igfsName())) - return true; - } - } - return false; - } - /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { @@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx { } } - /** - * Format message listener required for format action completion. - */ - private class FormatMessageListener implements GridMessageListener { - /** {@inheritDoc} */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public void onMessage(UUID nodeId, Object msg) { - if (msg instanceof IgfsDeleteMessage) { - ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId); - - if (node != null) { - if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) { - IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg; - - try { - msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e); - - return; - } - - assert msg0.id() != null; - - GridFutureAdapter<?> fut = delFuts.remove(msg0.id()); - - if (fut != null) { - if (msg0.error() == null) - fut.onDone(); - else - fut.onDone(msg0.error()); - } - } - } - } - } - } - - /** - * Discovery listener required for format actions completion. - */ - private class FormatDiscoveryListener implements GridLocalEventListener { - /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - - DiscoveryEvent evt0 = (DiscoveryEvent)evt; - - if (evt0.eventNode() != null) { - if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) { - Collection<IgniteUuid> rmv = new HashSet<>(); - - for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) { - IgniteUuid id = fut.getKey(); - - try { - if (!meta.exists(id)) { - fut.getValue().onDone(); - - rmv.add(id); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to check file existence: " + id, e); - } - } - - for (IgniteUuid id : rmv) - delFuts.remove(id); - } - } - } - } - /** {@inheritDoc} */ @Override public IgniteUuid nextAffinityKey() { return safeOp(new Callable<IgniteUuid>() { @@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx { return t; } } + + /** + * Format runnable. + */ + private class FormatRunnable implements Runnable { + /** Target future. */ + private final GridFutureAdapter<?> fut; + + /** + * Constructor. + * + * @param fut Future. + */ + public FormatRunnable(GridFutureAdapter<?> fut) { + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void run() { + IgfsException err = null; + + try { + format(); + } + catch (Throwable err0) { + err = IgfsUtils.toIgfsException(err0); + } + finally { + if (err == null) + fut.onDone(); + else + fut.onDone(err); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 6fa9877..cfe549f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -180,7 +180,7 @@ public class IgfsUtils { * @return Converted IGFS exception. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static IgfsException toIgfsException(Exception err) { + public static IgfsException toIgfsException(Throwable err) { IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null; IgfsException igfsErr = X.cause(err, IgfsException.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java index fd4ec17..4e0f12b 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java @@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest { igfs.format(); - igfs.awaitDeletesAsync().get(); - GridTestUtils.retryAssert(log, 50, 100, new CA() { @Override public void apply() { for (int i = 0; i < NODE_CNT; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java index 3933e86..266945f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.transactions.Transaction; import org.jsr166.ThreadLocalRandom8; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * {@link IgfsAttributes} test case. @@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest { } /** - * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testPartitionedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testColocatedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testReplicatedOversizeDelay() throws Exception { - cacheMode = REPLICATED; - - checkOversizeDelay(); - } - - /** * Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache. * * @throws Exception If failed. @@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest { } /** - * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory. - * - * @throws Exception If failed. - */ - private void checkOversizeDelay() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - - igfsMaxData = 256; - trashPurgeTimeout = 2000; - - startUp(); - - IgfsImpl igfs = igfs(0); - - final IgfsPath path = new IgfsPath("/file"); - final IgfsPath otherPath = new IgfsPath("/fileOther"); - - // Fill cache with data up to it's limit. - IgfsOutputStream os = igfs.create(path, false); - os.write(chunk((int)igfsMaxData)); - os.close(); - - final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache( - igfs.configuration().getMetaCacheName()); - - // Start a transaction in a separate thread which will lock file ID. - final IgniteUuid id = igfs.context().meta().fileId(path); - final IgfsEntryInfo info = igfs.context().meta().info(id); - - final AtomicReference<Throwable> err = new AtomicReference<>(); - - try { - new Thread(new Runnable() { - @Override public void run() { - try { - - try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - metaCache.get(id); - - latch.await(); - - U.sleep(1000); // Sleep here so that data manager could "see" oversize. - - tx.commit(); - } - } - catch (Throwable e) { - err.set(e); - } - } - }).start(); - - // Now add file ID to trash listing so that delete worker could "see" it. - IgniteUuid trashId = IgfsUtils.randomTrashId(); - - try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(), - new IgfsListingEntry(info)); - - // Clear root listing. - metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID)); - - // Add file to trash listing. - IgfsEntryInfo trashInfo = metaCache.get(trashId); - - if (trashInfo == null) - metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing)); - else - metaCache.put(trashId, trashInfo.listing(listing)); - - tx.commit(); - } - - assert metaCache.get(trashId) != null; - - // Now the file is locked and is located in trash, try adding some more data. - os = igfs.create(otherPath, false); - os.write(new byte[1]); - - latch.countDown(); - - os.close(); - - assert err.get() == null; - } - finally { - latch.countDown(); // Safety. - } - } - - /** * Ensure that IGFS size is correctly updated in case of preloading. * * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index b38f3a2..ffa6f7d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ @Nullable @Override public String clientLogDirectory() { return null; }