IGNITE-3294: Moved all logic to meta manager "create" method.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd0e5bf6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd0e5bf6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd0e5bf6 Branch: refs/heads/ignite-3294 Commit: fd0e5bf63181972aa8b64f6b404aae4d9a11cc8c Parents: bab3b1b Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Jun 10 11:47:42 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Jun 10 11:47:42 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsCreateResult.java | 66 ++++++++++++++ .../internal/processors/igfs/IgfsImpl.java | 53 ++++++----- .../processors/igfs/IgfsMetaManager.java | 38 ++++---- .../IgfsSecondaryFileSystemCreateContext.java | 92 ++++++++++++++++++++ .../IgfsSecondaryOutputStreamDescriptor.java | 59 ------------- .../igfs/IgfsMetaManagerSelfTest.java | 4 +- 6 files changed, 208 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java new file mode 100644 index 0000000..0b09e02 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.OutputStream; + +/** + * IGFS file create result. + */ +public class IgfsCreateResult { + /** File info in the primary file system. */ + private final IgfsEntryInfo info; + + /** Output stream to the secondary file system. */ + private final OutputStream secondaryOut; + + /** + * Constructor. + * + * @param info File info in the primary file system. + * @param secondaryOut Output stream to the secondary file system. + */ + public IgfsCreateResult(IgfsEntryInfo info, @Nullable OutputStream secondaryOut) { + assert info != null; + + this.info = info; + this.secondaryOut = secondaryOut; + } + + /** + * @return File info in the primary file system. + */ + public IgfsEntryInfo info() { + return info; + } + + /** + * @return Output stream to the secondary file system. + */ + @Nullable public OutputStream secondaryOutputStream() { + return secondaryOut; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsCreateResult.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 0808619..9f83f36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -106,7 +106,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED; import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; @@ -1019,28 +1018,10 @@ public final class IgfsImpl implements IgfsEx { log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" + overwrite + ", props=" + props + ']'); + // Resolve mode. final IgfsMode mode = resolveMode(path); - IgfsFileWorkerBatch batch; - - if (mode != PRIMARY) { - assert mode == DUAL_SYNC || mode == DUAL_ASYNC; - - await(path); - - IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate, - props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey); - - batch = newBatch(path, desc.out()); - - IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), - bufferSize(bufSize), mode, batch); - - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); - - return os; - } - + // Prepare properties. final Map<String, String> dirProps, fileProps; if (props == null) { @@ -1051,19 +1032,37 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); - IgfsEntryInfo res = meta.create( + // Prepare context for DUAL mode. + IgfsSecondaryFileSystemCreateContext secondaryCtx = null; + + if (mode != PRIMARY) + secondaryCtx = new IgfsSecondaryFileSystemCreateContext(secondaryFs, simpleCreate, + (short)replication, groupBlockSize(), bufSize); + + // Await for async ops completion if in DUAL mode. + if (mode != PRIMARY) + await(path); + + // Perform create. + IgfsCreateResult res = meta.create( path, dirProps, overwrite, cfg.getBlockSize(), affKey, - evictExclude(path, true), - fileProps + evictExclude(path, mode == PRIMARY), + fileProps, + secondaryCtx ); assert res != null; - return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null); + // Create secondary file system batch. + OutputStream secondaryStream = res.secondaryOutputStream(); + + IgfsFileWorkerBatch batch = secondaryStream != null ? newBatch(path, secondaryStream) : null; + + return new IgfsOutputStreamImpl(igfsCtx, path, res.info(), bufferSize(bufSize), mode, batch); } }); } @@ -1094,9 +1093,9 @@ public final class IgfsImpl implements IgfsEx { await(path); - IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize); + IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize); - batch = newBatch(path, desc.out()); + batch = newBatch(path, desc.secondaryOutputStream()); return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 3d82ebd..bff274f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -1792,7 +1792,7 @@ public class IgfsMetaManager extends IgfsManager { * @return Output stream descriptor. * @throws IgniteCheckedException If file creation failed. */ - public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs, + public IgfsCreateResult createDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final boolean simpleCreate, @Nullable final Map<String, String> props, @@ -1839,19 +1839,19 @@ public class IgfsMetaManager extends IgfsManager { * @return Output stream descriptor. * @throws IgniteCheckedException If output stream open for append has failed. */ - public IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, + public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final int bufSize) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { assert fs != null; assert path != null; - SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task = - new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() { + SynchronizationTask<IgfsCreateResult> task = + new SynchronizationTask<IgfsCreateResult>() { /** Output stream to the secondary file system. */ private OutputStream out; - @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, + @Override public IgfsCreateResult onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception { validTxState(true); @@ -1890,10 +1890,10 @@ public class IgfsMetaManager extends IgfsManager { // Set lock and return. IgfsEntryInfo lockedInfo = invokeLock(info.id(), false); - return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out); + return new IgfsCreateResult(lockedInfo, out); } - @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err) + @Override public IgfsCreateResult onFailure(@Nullable Exception err) throws IgniteCheckedException { U.closeQuiet(out); @@ -2811,19 +2811,25 @@ public class IgfsMetaManager extends IgfsManager { * @param affKey Affinity key. * @param evictExclude Evict exclude flag. * @param fileProps File properties. - * @return @return Resulting info. + * @param secondaryCtx Secondary file system create context. + * @return @return Operation result. * @throws IgniteCheckedException If failed. */ - IgfsEntryInfo create( + IgfsCreateResult create( final IgfsPath path, Map<String, String> dirProps, final boolean overwrite, final int blockSize, final @Nullable IgniteUuid affKey, final boolean evictExclude, - @Nullable Map<String, String> fileProps) throws IgniteCheckedException { + @Nullable Map<String, String> fileProps, + @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException { validTxState(false); + if (secondaryCtx != null) + return createDual(secondaryCtx.fileSystem(), path, secondaryCtx.simpleCreate(), fileProps, overwrite, + secondaryCtx.bufferSize(), secondaryCtx.replication(), secondaryCtx.blockSize(), affKey); + while (true) { if (busyLock.enterBusy()) { try { @@ -2897,7 +2903,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); - return newInfo; + return new IgfsCreateResult(newInfo, null); } else { // Create file and parent folders. @@ -2913,7 +2919,7 @@ public class IgfsMetaManager extends IgfsManager { // Generate events. generateCreateEvents(res.createdPaths(), true); - return res.info(); + return new IgfsCreateResult(res.info(), null); } } } @@ -3107,7 +3113,7 @@ public class IgfsMetaManager extends IgfsManager { /** * Synchronization task to create a file. */ - private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> { + private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsCreateResult> { /** Secondary file system. */ private IgfsSecondaryFileSystem fs; @@ -3171,7 +3177,7 @@ public class IgfsMetaManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) + @Override public IgfsCreateResult onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception { validTxState(true); @@ -3266,11 +3272,11 @@ public class IgfsMetaManager extends IgfsManager { if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED)) pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED)); - return new IgfsSecondaryOutputStreamDescriptor(newInfo, out); + return new IgfsCreateResult(newInfo, out); } /** {@inheritDoc} */ - @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException { + @Override public IgfsCreateResult onFailure(Exception err) throws IgniteCheckedException { U.closeQuiet(out); U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java new file mode 100644 index 0000000..ef71c6c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; + +/** + * Context for secondary file system create request. + */ +public class IgfsSecondaryFileSystemCreateContext { + /** File system. */ + private final IgfsSecondaryFileSystem fs; + + /** Simple create flag. */ + private final boolean simpleCreate; + + /** Replication. */ + private final short replication; + + /** Block size. */ + private final long blockSize; + + /** Buffer size. */ + private final int bufSize; + + /** + * Constructor. + * + * @param fs File system. + * @param simpleCreate Simple create flag. + * @param blockSize Block size. + * @param replication Replication. + */ + public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, boolean simpleCreate, short replication, + long blockSize, int bufSize) { + this.fs = fs; + this.simpleCreate = simpleCreate; + this.replication = replication; + this.blockSize = blockSize; + this.bufSize = bufSize; + } + + /** + * @return Secondary file system. + */ + public IgfsSecondaryFileSystem fileSystem() { + return fs; + } + + /** + * @return Simple crate flag. + */ + public boolean simpleCreate() { + return simpleCreate; + } + + /** + * @return Replication. + */ + public short replication() { + return replication; + } + + /** + * @return Block size. + */ + public long blockSize() { + return blockSize; + } + + /** + * @return Buffer size. + */ + public int bufferSize() { + return bufSize; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java deleted file mode 100644 index 6bbc2c0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import java.io.OutputStream; - -/** - * Descriptor of an output stream opened to the secondary file system. - */ -public class IgfsSecondaryOutputStreamDescriptor { - /** File info in the primary file system. */ - private final IgfsEntryInfo info; - - /** Output stream to the secondary file system. */ - private final OutputStream out; - - /** - * Constructor. - * - * @param info File info in the primary file system. - * @param out Output stream to the secondary file system. - */ - IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) { - assert info != null; - assert out != null; - - this.info = info; - this.out = out; - } - - /** - * @return File info in the primary file system. - */ - IgfsEntryInfo info() { - return info; - } - - /** - * @return Output stream to the secondary file system. - */ - OutputStream out() { - return out; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java index 8b88157..6053d3b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java @@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest { assertEmpty(mgr.directoryListing(ROOT_ID)); assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META)); - assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null)); + assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null, null)); IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir"); assertNotNull(dirEntry); @@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest { private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException { IgfsPath p = path(path); - IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null); + IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null, null).info(); assert res != null; assert !res.isDirectory();