http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/PersistenceModule.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java b/src/org/waveprotocol/box/server/persistence/PersistenceModule.java deleted file mode 100644 index 2ce3a32..0000000 --- a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence; - -import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import org.waveprotocol.box.server.persistence.file.FileAccountStore; -import org.waveprotocol.box.server.persistence.file.FileAttachmentStore; -import org.waveprotocol.box.server.persistence.file.FileDeltaStore; -import org.waveprotocol.box.server.persistence.file.FileSignerInfoStore; -import org.waveprotocol.box.server.persistence.memory.MemoryDeltaStore; -import org.waveprotocol.box.server.persistence.memory.MemoryStore; -import org.waveprotocol.box.server.persistence.mongodb.MongoDbProvider; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.wave.crypto.CertPathStore; - -/** - * Module for setting up the different persistence stores. - * - *<p> - * The valid names for the cert store are 'memory', 'file' and 'mongodb' - * - *<p> - *The valid names for the attachment store are 'disk' and 'mongodb' - * - *<p> - *The valid names for the account store are 'memory', 'file' and 'mongodb'. - * - * @author [email protected] (Lennard de Rijk) - */ -public class PersistenceModule extends AbstractModule { - - private final String signerInfoStoreType; - - private final String attachmentStoreType; - - private final String accountStoreType; - - private final String deltaStoreType; - - private MongoDbProvider mongoDbProvider; - - private final String mongoDBHost; - - private final String mongoDBPort; - - private final String mongoDBdatabase; - - - @Inject - public PersistenceModule(Config config) { - this.signerInfoStoreType = config.getString("core.signer_info_store_type"); - this.attachmentStoreType = config.getString("core.attachment_store_type"); - this.accountStoreType = config.getString("core.account_store_type"); - this.deltaStoreType = config.getString("core.delta_store_type"); - this.mongoDBHost = config.getString("core.mongodb_host"); - this.mongoDBPort = config.getString("core.mongodb_port"); - this.mongoDBdatabase = config.getString("core.mongodb_database"); - } - - /** - * Returns a {@link MongoDbProvider} instance. - */ - public MongoDbProvider getMongoDbProvider() { - if (mongoDbProvider == null) { - mongoDbProvider = new MongoDbProvider(mongoDBHost, mongoDBPort, mongoDBdatabase); - } - return mongoDbProvider; - } - - @Override - protected void configure() { - bindCertPathStore(); - bindAttachmentStore(); - bindAccountStore(); - bindDeltaStore(); - } - - /** - * Binds the CertPathStore implementation to the store specified in the - * properties. - */ - private void bindCertPathStore() { - if (signerInfoStoreType.equalsIgnoreCase("memory")) { - bind(CertPathStore.class).to(MemoryStore.class).in(Singleton.class); - } else if (signerInfoStoreType.equalsIgnoreCase("file")) { - bind(CertPathStore.class).to(FileSignerInfoStore.class).in(Singleton.class); - } else if (signerInfoStoreType.equalsIgnoreCase("mongodb")) { - MongoDbProvider mongoDbProvider = getMongoDbProvider(); - bind(CertPathStore.class).toInstance(mongoDbProvider.provideMongoDbStore()); - } else { - throw new RuntimeException( - "Invalid certificate path store type: '" + signerInfoStoreType + "'"); - } - } - - private void bindAttachmentStore() { - if (attachmentStoreType.equalsIgnoreCase("disk")) { - bind(AttachmentStore.class).to(FileAttachmentStore.class).in(Singleton.class); - } else if (attachmentStoreType.equalsIgnoreCase("mongodb")) { - MongoDbProvider mongoDbProvider = getMongoDbProvider(); - bind(AttachmentStore.class).toInstance(mongoDbProvider.provideMongoDbStore()); - } else { - throw new RuntimeException("Invalid attachment store type: '" + attachmentStoreType + "'"); - } - } - - private void bindAccountStore() { - if (accountStoreType.equalsIgnoreCase("memory")) { - bind(AccountStore.class).to(MemoryStore.class).in(Singleton.class); - } else if (accountStoreType.equalsIgnoreCase("file")) { - bind(AccountStore.class).to(FileAccountStore.class).in(Singleton.class); - } else if (accountStoreType.equalsIgnoreCase("fake")) { - bind(AccountStore.class).to(FakePermissiveAccountStore.class).in(Singleton.class); - } else if (accountStoreType.equalsIgnoreCase("mongodb")) { - MongoDbProvider mongoDbProvider = getMongoDbProvider(); - bind(AccountStore.class).toInstance(mongoDbProvider.provideMongoDbStore()); - } else { - throw new RuntimeException("Invalid account store type: '" + accountStoreType + "'"); - } - } - - private void bindDeltaStore() { - if (deltaStoreType.equalsIgnoreCase("memory")) { - bind(DeltaStore.class).to(MemoryDeltaStore.class).in(Singleton.class); - } else if (deltaStoreType.equalsIgnoreCase("file")) { - bind(DeltaStore.class).to(FileDeltaStore.class).in(Singleton.class); - } else if (deltaStoreType.equalsIgnoreCase("mongodb")) { - MongoDbProvider mongoDbProvider = getMongoDbProvider(); - bind(DeltaStore.class).toInstance(mongoDbProvider.provideMongoDbDeltaStore()); - } else { - throw new RuntimeException("Invalid delta store type: '" + deltaStoreType + "'"); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/PersistenceStartException.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/PersistenceStartException.java b/src/org/waveprotocol/box/server/persistence/PersistenceStartException.java deleted file mode 100644 index aaa693a..0000000 --- a/src/org/waveprotocol/box/server/persistence/PersistenceStartException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence; - -/** - * {@link Exception} thrown when a persistence system fails to start. - * - * @author [email protected] (Lennard de Rijk) - * - */ -public class PersistenceStartException extends RuntimeException { - - public PersistenceStartException(String message, Throwable e) { - super(message, e); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/SignerInfoStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/SignerInfoStore.java b/src/org/waveprotocol/box/server/persistence/SignerInfoStore.java deleted file mode 100644 index 7e0f9e4..0000000 --- a/src/org/waveprotocol/box/server/persistence/SignerInfoStore.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence; - -import org.waveprotocol.wave.crypto.CertPathStore; - -/** - * Extends {@link CertPathStore} to include initialization method. - * For use in persistence implementations. - * - * @author [email protected](Tad Glines) - */ -public interface SignerInfoStore extends CertPathStore { - void initializeSignerInfoStore() throws PersistenceException; -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/DeltaIndex.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/DeltaIndex.java b/src/org/waveprotocol/box/server/persistence/file/DeltaIndex.java deleted file mode 100644 index a34250b..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/DeltaIndex.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.base.Preconditions; - -import org.waveprotocol.wave.model.util.Pair; - -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; - -/** - * An index for quickly accessing deltas. The index is an array of longs, one for each version. - * - * The index must return the offset of a delta applied at a version, and of a delta leading to - * a version. - * - * Internal format: - * - * Let's assume that operations are 10 bytes long. Deltas are separated by |. - * <pre> - * Deltas: | 0 1 2 | 3 | 4 5 | - * offset 0 30 40 60 - * - * Index: 0 -1 -1 30 40 -41 - * </pre> - * The file contains a negative value for any version for which there is not a delta. This will - * happen whenever the previous delta contains multiple ops. This negative value is ~offset of - * the delta containing the op, so that finding the delta leading to a version is easy: just read - * the previous index entry. - * - * @author [email protected] (Joseph Gentle) - */ -public class DeltaIndex { - /** Returned from methods when there is no record for a specified version. */ - public static final int NO_RECORD_FOR_VERSION = -1; - - private static final int RECORD_LENGTH = 8; - private final File fileRef; - private RandomAccessFile file; - - public DeltaIndex(File indexFile) { - this.fileRef = indexFile; - } - - /** - * Open the index. - * - * @param baseCollection the collection which the index indexes. - * @throws IOException - */ - public void openForCollection(FileDeltaCollection baseCollection) throws IOException { - if (!fileRef.exists()) { - fileRef.mkdirs(); - rebuildIndexFromDeltas(baseCollection); - } else { - // TODO(josephg): For now, we just rebuild the index anyway. - rebuildIndexFromDeltas(baseCollection); - } - } - - private void checkOpen() { - Preconditions.checkState(file != null, "Index file not open"); - } - - /** - * Rebuild the index based on a delta collection. This will wipe the index file. - * - * @param collection - * @throws IOException - */ - public void rebuildIndexFromDeltas(FileDeltaCollection collection) throws IOException { - if (file != null) { - file.close(); - } - - if (fileRef.exists()) { - fileRef.delete(); - } - - file = FileUtils.getOrCreateFile(fileRef); - - for (Pair<Pair<Long, Integer>, Long> pair : collection.getOffsetsIterator()) { - addDelta(pair.first.first, pair.first.second, pair.second); - } - } - - /** - * Get the delta file offset for the specified version. - * - * @param version - * @return the offset on success, NO_RECORD_FOR_VERSION if there's no record. - * @throws IOException - */ - public long getOffsetForVersion(long version) throws IOException { - if (!seekToPosition(version)) { - return NO_RECORD_FOR_VERSION; - } - long offset = file.readLong(); - return offset < 0 ? NO_RECORD_FOR_VERSION : offset; - } - - /** - * Get the delta file offset for the specified end version. - * - * @param version - * @return the offset on success, NO_RECORD_FOR_VERSION if there's no record. - * @throws IOException - */ - public long getOffsetForEndVersion(long version) throws IOException { - if (!seekToPosition(version - 1)) { - return NO_RECORD_FOR_VERSION; - } - long offset = file.readLong(); - try { - if (file.readLong() < 0) { - // user tried to read something which isn't an end version - return NO_RECORD_FOR_VERSION; - } - } catch (EOFException e) { - // it's ok to hit the end of the file, for the last end version - } - return offset < 0 ? ~offset : offset; - } - - /** - * Seeks to the corresponding version, if it is valid. - * - * @param version version to seek to. - * @return true iff the position is valid - * @throws IOException - */ - private boolean seekToPosition(long version) throws IOException { - if (version < 0) { - return false; - } - checkOpen(); - - long position = version * RECORD_LENGTH; - if (position >= file.length()) { - return false; - } - - file.seek(position); - return true; - } - - /** - * Indexes a new delta. - * - * @param version the version at which the delta is applied - * @param numOperations number of operations in the delta - * @param offset offset at which the delta is stored - */ - public void addDelta(long version, int numOperations, long offset) - throws IOException { - checkOpen(); - - long position = version * RECORD_LENGTH; - // We're expected to append the new delta - long fileLength = file.length(); - Preconditions.checkState(position == fileLength, - "position = %d, file=%d", position, fileLength); - file.seek(position); - file.writeLong(offset); - // fill in the additional positions with the 1-complement of the offset, - for (int i = 1; i < numOperations; i++) { - file.writeLong(~offset); - } - } - - /** - * @return number of records in the index - */ - public long length() { - checkOpen(); - - long fileLength; - try { - fileLength = file.length(); - } catch (IOException e) { - // This shouldn't happen in practice. - throw new RuntimeException("IO error reading index file length", e); - } - return fileLength / RECORD_LENGTH; - } - - public void close() throws IOException { - if (file != null) { - file.close(); - file = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileAccountStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileAccountStore.java b/src/org/waveprotocol/box/server/persistence/file/FileAccountStore.java deleted file mode 100644 index 1e8b41c..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileAccountStore.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.waveprotocol.box.server.account.AccountData; -import org.waveprotocol.box.server.persistence.AccountStore; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.protos.ProtoAccountDataSerializer; -import org.waveprotocol.box.server.persistence.protos.ProtoAccountStoreData.ProtoAccountData; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.*; -import java.util.Map; - -/** - * A flat file based implementation of {@link AccountStore} - * - * @author [email protected] (Tad Glines) - */ -public class FileAccountStore implements AccountStore { - private static final String ACCOUNT_FILE_EXTENSION = ".account"; - private final String accountStoreBasePath; - private final Map<ParticipantId, AccountData> accounts = Maps.newHashMap(); - - private static final Log LOG = Log.get(FileAccountStore.class); - - @Inject - public FileAccountStore(Config config) { - this.accountStoreBasePath = config.getString("core.account_store_directory"); - } - - @Override - public void initializeAccountStore() throws PersistenceException { - FileUtils.performDirectoryChecks(accountStoreBasePath, ACCOUNT_FILE_EXTENSION, "account store", - LOG); - } - - @Override - public AccountData getAccount(ParticipantId id) throws PersistenceException { - synchronized (accounts) { - AccountData account = accounts.get(id); - if (account == null) { - account = readAccount(id); - if (account != null) { - accounts.put(id, account); - } - } - return account; - } - } - - @Override - public void putAccount(AccountData account) throws PersistenceException { - synchronized (accounts) { - Preconditions.checkNotNull(account); - writeAccount(account); - accounts.put(account.getId(), account); - } - } - - @Override - public void removeAccount(ParticipantId id) throws PersistenceException { - synchronized (accounts) { - File file = new File(participantIdToFileName(id)); - if (file.exists()) { - if (!file.delete()) { - throw new PersistenceException("Failed to delete account data associated with " - + id.getAddress()); - } - } - accounts.remove(id); - } - } - - private String participantIdToFileName(ParticipantId id) { - return accountStoreBasePath + File.separator + id.getAddress().toLowerCase() - + ACCOUNT_FILE_EXTENSION; - } - - private AccountData readAccount(ParticipantId id) throws PersistenceException { - File accountFile = new File(participantIdToFileName(id)); - FileInputStream file = null; - try { - if (!accountFile.exists()) { - return null; - } - file = new FileInputStream(accountFile); - ProtoAccountData data = ProtoAccountData.newBuilder().mergeFrom(file).build(); - return ProtoAccountDataSerializer.deserialize(data); - } catch (IOException e) { - LOG.severe("Failed to read account data from file: " + accountFile.getAbsolutePath(), e); - throw new PersistenceException(e); - } finally { - FileUtils.closeAndIgnoreException(file, accountFile, LOG); - } - } - - private void writeAccount(AccountData account) throws PersistenceException { - File accountFile = new File(participantIdToFileName(account.getId())); - OutputStream file = null; - try { - file = new FileOutputStream(accountFile); - ProtoAccountData data = ProtoAccountDataSerializer.serialize(account); - file.write(data.toByteArray()); - file.flush(); - } catch (IOException e) { - LOG.severe("Failed to write account data to file: " + accountFile.getAbsolutePath(), e); - throw new PersistenceException(e); - } finally { - FileUtils.closeAndIgnoreException(file, accountFile, LOG); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileAttachmentStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileAttachmentStore.java b/src/org/waveprotocol/box/server/persistence/file/FileAttachmentStore.java deleted file mode 100644 index 64097ea..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileAttachmentStore.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.waveprotocol.box.attachment.AttachmentMetadata; -import org.waveprotocol.box.attachment.AttachmentProto; -import org.waveprotocol.box.attachment.proto.AttachmentMetadataProtoImpl; -import org.waveprotocol.box.server.persistence.AttachmentStore; -import org.waveprotocol.wave.media.model.AttachmentId; -import org.waveprotocol.wave.model.util.CharBase64; - -import java.io.*; - -/** - * An implementation of AttachmentStore which uses files on disk - * - * @author [email protected] (Joseph Gentle) - * @author [email protected] (A. Kaplanov) - */ -public class FileAttachmentStore implements AttachmentStore { - - private final static String META_EXT = ".meta"; - private final static String THUMBNAIL_EXT = ".thumbnail"; - - /** - * The directory in which the attachments are stored. - */ - private final String basePath; - - @Inject - public FileAttachmentStore(Config config) { - this.basePath = config.getString("core.attachment_store_directory"); - new File(basePath).mkdirs(); - } - - @Override - public AttachmentMetadata getMetadata(AttachmentId attachmentId) throws IOException { - File file = new File(getMetadataPath(attachmentId)); - if (!file.exists()) { - return null; - } - AttachmentProto.AttachmentMetadata protoMetadata = - AttachmentProto.AttachmentMetadata.parseFrom(new FileInputStream(file)); - return new AttachmentMetadataProtoImpl(protoMetadata); - } - - @Override - public AttachmentData getAttachment(AttachmentId attachmentId) throws IOException { - final File file = new File(getAttachmentPath(attachmentId)); - if (!file.exists()) { - return null; - } - return new AttachmentData() { - - @Override - public InputStream getInputStream() throws IOException { - return new FileInputStream(file); - } - - @Override - public long getSize() { - return file.length(); - } - }; - } - - @Override - public AttachmentData getThumbnail(AttachmentId attachmentId) throws IOException { - final File file = new File(getThumbnailPath(attachmentId)); - if (!file.exists()) { - return null; - } - return new AttachmentData() { - - @Override - public InputStream getInputStream() throws IOException { - return new FileInputStream(file); - } - - @Override - public long getSize() { - return file.length(); - } - }; - } - - @Override - public void storeMetadata(AttachmentId attachmentId, AttachmentMetadata metaData) throws IOException { - File file = new File(getMetadataPath(attachmentId)); - if (file.exists()) { - throw new IOException("Attachment already exist"); - } - FileOutputStream stream = new FileOutputStream(file); - AttachmentMetadataProtoImpl proto = new AttachmentMetadataProtoImpl(metaData); - proto.getPB().writeTo(stream); - stream.close(); - } - - @Override - public void storeAttachment(AttachmentId attachmentId, InputStream data) throws IOException { - File file = new File(getAttachmentPath(attachmentId)); - if (file.exists()) { - throw new IOException("Attachment already exist"); - } - FileOutputStream stream = new FileOutputStream(file); - writeTo(data, stream); - stream.close(); - } - - @Override - public void storeThumbnail(AttachmentId attachmentId, InputStream data) throws IOException { - File file = new File(getThumbnailPath(attachmentId)); - if (file.exists()) { - throw new IOException("Attachment already exist"); - } - FileOutputStream stream = new FileOutputStream(file); - writeTo(data, stream); - stream.close(); - } - - @Override - public void deleteAttachment(AttachmentId attachmentId) { - File file = new File(getAttachmentPath(attachmentId)); - if (file.exists()) { - file.delete(); - } - } - - private String getMetadataPath(AttachmentId attachmentId) { - return basePath + File.separatorChar + encodeId(attachmentId) + META_EXT; - } - - private String getAttachmentPath(AttachmentId attachmentId) { - return basePath + File.separatorChar + encodeId(attachmentId); - } - - private String getThumbnailPath(AttachmentId attachmentId) { - return basePath + File.separatorChar + encodeId(attachmentId) + THUMBNAIL_EXT; - } - - private static void writeTo(InputStream source, OutputStream dest) throws IOException { - byte[] buffer = new byte[256]; - int length; - while ((length = source.read(buffer)) != -1) { - dest.write(buffer, 0, length); - } - } - - private static String encodeId(AttachmentId id) { - try { - return CharBase64.encode(id.serialise().getBytes("UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java b/src/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java deleted file mode 100644 index 65683c4..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java +++ /dev/null @@ -1,574 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreDataSerializer; -import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreData.ProtoTransformedWaveletDelta; -import org.waveprotocol.box.server.waveserver.AppliedDeltaUtil; -import org.waveprotocol.box.server.waveserver.ByteStringMessage; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.nio.channels.Channels; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; - -/** - * A flat file based implementation of DeltasAccess. This class provides a storage backend for the - * deltas in a single wavelet. - * - * The file starts with a header. The header contains the version of the file protocol. After the - * version, the file contains a sequence of delta records. Each record contains a header followed - * by a WaveletDeltaRecord. - * - * A particular FileDeltaCollection instance assumes that it's <em>the only one</em> reading and - * writing a particular wavelet. The methods are <em>not</em> multithread-safe. - * - * See this document for design specifics: - * https://sites.google.com/a/waveprotocol.org/wave-protocol/protocol/design-proposals/wave-store-design-for-wave-in-a-box - * - * @author [email protected] (Joseph Gentle) - */ -public class FileDeltaCollection implements DeltasAccess { - public static final String DELTAS_FILE_SUFFIX = ".deltas"; - public static final String INDEX_FILE_SUFFIX = ".index"; - - private static final byte[] FILE_MAGIC_BYTES = new byte[]{'W', 'A', 'V', 'E'}; - private static final int FILE_PROTOCOL_VERSION = 1; - private static final int FILE_HEADER_LENGTH = 8; - - private static final int DELTA_PROTOCOL_VERSION = 1; - - private static final Log LOG = Log.get(FileDeltaCollection.class); - - private final WaveletName waveletName; - private final RandomAccessFile file; - private final DeltaIndex index; - - private HashedVersion endVersion; - private boolean isOpen; - - /** - * A single record in the delta file. - */ - private class DeltaHeader { - /** Length in bytes of the header */ - public static final int HEADER_LENGTH = 12; - - /** The protocol version of the remaining fields. For now, must be 1. */ - public final int protoVersion; - - /** The length of the applied delta segment, in bytes. */ - public final int appliedDeltaLength; - public final int transformedDeltaLength; - - public DeltaHeader(int protoVersion, int appliedDeltaLength, int transformedDeltaLength) { - this.protoVersion = protoVersion; - this.appliedDeltaLength = appliedDeltaLength; - this.transformedDeltaLength = transformedDeltaLength; - } - - public void checkVersion() throws IOException { - if (protoVersion != DELTA_PROTOCOL_VERSION) { - throw new IOException("Invalid delta header"); - } - } - } - - /** - * Opens a file delta collection. - * - * @param waveletName name of the wavelet to open - * @param basePath base path of files - * @return an open collection - * @throws IOException - */ - public static FileDeltaCollection open(WaveletName waveletName, String basePath) - throws IOException { - Preconditions.checkNotNull(waveletName, "null wavelet name"); - - RandomAccessFile deltaFile = FileUtils.getOrCreateFile(deltasFile(basePath, waveletName)); - setOrCheckFileHeader(deltaFile); - DeltaIndex index = new DeltaIndex(indexFile(basePath, waveletName)); - - FileDeltaCollection collection = new FileDeltaCollection(waveletName, deltaFile, index); - - index.openForCollection(collection); - collection.initializeEndVersionAndTruncateTrailingJunk(); - return collection; - } - - /** - * Delete the delta files from disk. - * - * @throws PersistenceException - */ - public static void delete(WaveletName waveletName, String basePath) throws PersistenceException { - String error = ""; - File deltas = deltasFile(basePath, waveletName); - - if (deltas.exists()) { - if (!deltas.delete()) { - error += "Could not delete deltas file: " + deltas.getAbsolutePath() + ". "; - } - } - - File index = indexFile(basePath, waveletName); - if (index.exists()) { - if (!index.delete()) { - error += "Could not delete index file: " + index.getAbsolutePath(); - } - } - if (!error.isEmpty()) { - throw new PersistenceException(error); - } - } - - /** - * Create a new file delta collection for the given wavelet. - * - * @param waveletName name of the wavelet - * @param deltaFile the file of deltas - * @param index index into deltas - */ - public FileDeltaCollection(WaveletName waveletName, RandomAccessFile deltaFile, - DeltaIndex index) { - this.waveletName = waveletName; - this.file = deltaFile; - this.index = index; - this.isOpen = true; - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - @Override - public HashedVersion getEndVersion() { - return endVersion; - } - - @Override - public WaveletDeltaRecord getDelta(long version) throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readRecord() : null; - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { - checkIsOpen(); - return seekToEndRecord(version) ? readRecord() : null; - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) - throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readAppliedDeltaFromRecord() : null; - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readTransformedDeltaFromRecord() : null; - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws IOException { - checkIsOpen(); - ByteStringMessage<ProtocolAppliedWaveletDelta> applied = getAppliedDelta(version); - - return (applied != null) ? AppliedDeltaUtil.getHashedVersionAppliedAt(applied) : null; - } - - @Override - public HashedVersion getResultingVersion(long version) throws IOException { - checkIsOpen(); - TransformedWaveletDelta transformed = getTransformedDelta(version); - - return (transformed != null) ? transformed.getResultingVersion() : null; - } - - @Override - public void close() throws IOException { - file.close(); - index.close(); - endVersion = null; - isOpen = false; - } - - @Override - public void append(Collection<WaveletDeltaRecord> deltas) throws PersistenceException { - checkIsOpen(); - try { - file.seek(file.length()); - - WaveletDeltaRecord lastDelta = null; - for (WaveletDeltaRecord delta : deltas) { - index.addDelta(delta.getTransformedDelta().getAppliedAtVersion(), - delta.getTransformedDelta().size(), - file.getFilePointer()); - writeDelta(delta); - lastDelta = delta; - } - - // fsync() before returning. - file.getChannel().force(true); - endVersion = lastDelta.getTransformedDelta().getResultingVersion(); - } catch (IOException e) { - throw new PersistenceException(e); - } - } - - @Override - public boolean isEmpty() { - checkIsOpen(); - return index.length() == 0; - } - - /** - * Creates a new iterator to move over the positions of the deltas in the file. - * - * Each pair returned is ((version, numOperations), offset). - * @throws IOException - */ - Iterable<Pair<Pair<Long,Integer>, Long>> getOffsetsIterator() throws IOException { - checkIsOpen(); - - return new Iterable<Pair<Pair<Long, Integer>, Long>>() { - @Override - public Iterator<Pair<Pair<Long, Integer>, Long>> iterator() { - return new Iterator<Pair<Pair<Long, Integer>, Long>>() { - Pair<Pair<Long, Integer>, Long> nextRecord; - long nextPosition = FILE_HEADER_LENGTH; - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public Pair<Pair<Long, Integer>, Long> next() { - Pair<Pair<Long, Integer>, Long> record = nextRecord; - nextRecord = null; - return record; - } - - @Override - public boolean hasNext() { - // We're using hasNext to prime the next call to next(). This works because in practice - // any call to next() is preceeded by at least one call to hasNext(). - // We need to actually read the record here because hasNext() should return false - // if there's any incomplete data at the end of the file. - try { - if (file.length() <= nextPosition) { - // End of file. - return false; - } - } catch (IOException e) { - throw new RuntimeException("Could not get file position", e); - } - - if (nextRecord == null) { - // Read the next record - try { - file.seek(nextPosition); - TransformedWaveletDelta transformed = readTransformedDeltaFromRecord(); - nextRecord = Pair.of(Pair.of(transformed.getAppliedAtVersion(), - transformed.size()), nextPosition); - nextPosition = file.getFilePointer(); - } catch (IOException e) { - // The next entry is invalid. There was probably a write error / crash. - LOG.severe("Error reading delta file for " + waveletName + " starting at " + - nextPosition, e); - return false; - } - } - - return true; - } - }; - } - }; - } - - @VisibleForTesting - static final File deltasFile(String basePath, WaveletName waveletName) { - String waveletPathPrefix = FileUtils.waveletNameToPathSegment(waveletName); - return new File(basePath, waveletPathPrefix + DELTAS_FILE_SUFFIX); - } - - @VisibleForTesting - static final File indexFile(String basePath, WaveletName waveletName) { - String waveletPathPrefix = FileUtils.waveletNameToPathSegment(waveletName); - return new File(basePath, waveletPathPrefix + INDEX_FILE_SUFFIX); - } - - /** - * Checks that a file has a valid deltas header, adding the header if the - * file is shorter than the header. - */ - private static void setOrCheckFileHeader(RandomAccessFile file) throws IOException { - Preconditions.checkNotNull(file); - file.seek(0); - - if (file.length() < FILE_HEADER_LENGTH) { - // The file is new. Insert a header. - file.write(FILE_MAGIC_BYTES); - file.writeInt(FILE_PROTOCOL_VERSION); - } else { - byte[] magic = new byte[4]; - file.readFully(magic); - if (!Arrays.equals(FILE_MAGIC_BYTES, magic)) { - throw new IOException("Delta file magic bytes are incorrect"); - } - - int version = file.readInt(); - if (version != FILE_PROTOCOL_VERSION) { - throw new IOException(String.format("File protocol version mismatch - expected %d got %d", - FILE_PROTOCOL_VERSION, version)); - } - } - } - - private void checkIsOpen() { - Preconditions.checkState(isOpen, "Delta collection closed"); - } - - /** - * Seek to the start of a delta record. Returns false if the record doesn't exist. - */ - private boolean seekToRecord(long version) throws IOException { - Preconditions.checkArgument(version >= 0, "Version can't be negative"); - long offset = index.getOffsetForVersion(version); - return seekTo(offset); - } - - /** - * Seek to the start of a delta record given its end version. - * Returns false if the record doesn't exist. - */ - private boolean seekToEndRecord(long version) throws IOException { - Preconditions.checkArgument(version >= 0, "Version can't be negative"); - long offset = index.getOffsetForEndVersion(version); - return seekTo(offset); - } - - private boolean seekTo(long offset) throws IOException { - if (offset == DeltaIndex.NO_RECORD_FOR_VERSION) { - // There's no record for the specified version. - return false; - } else { - file.seek(offset); - return true; - } - } - - /** - * Read a record and return it. - */ - private WaveletDeltaRecord readRecord() throws IOException { - DeltaHeader header = readDeltaHeader(); - - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - readAppliedDelta(header.appliedDeltaLength); - TransformedWaveletDelta transformedDelta = readTransformedWaveletDelta( - header.transformedDeltaLength); - - return new WaveletDeltaRecord(AppliedDeltaUtil.getHashedVersionAppliedAt(appliedDelta), - appliedDelta, transformedDelta); - } - - /** - * Reads a record, and only parses & returns the applied data field. - */ - private ByteStringMessage<ProtocolAppliedWaveletDelta> readAppliedDeltaFromRecord() - throws IOException { - DeltaHeader header = readDeltaHeader(); - - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - readAppliedDelta(header.appliedDeltaLength); - file.skipBytes(header.transformedDeltaLength); - - return appliedDelta; - } - - /** - * Reads a record, and only parses & returns the transformed data field. - */ - private TransformedWaveletDelta readTransformedDeltaFromRecord() throws IOException { - DeltaHeader header = readDeltaHeader(); - - file.skipBytes(header.appliedDeltaLength); - TransformedWaveletDelta transformedDelta = readTransformedWaveletDelta( - header.transformedDeltaLength); - - return transformedDelta; - } - - - // *** Low level data reading methods - - /** Read a header from the file. Does not move the file pointer before reading. */ - private DeltaHeader readDeltaHeader() throws IOException { - int version = file.readInt(); - if (version != DELTA_PROTOCOL_VERSION) { - throw new IOException("Delta header invalid"); - } - int appliedDeltaLength = file.readInt(); - int transformedDeltaLength = file.readInt(); - DeltaHeader deltaHeader = new DeltaHeader(version, appliedDeltaLength, transformedDeltaLength); - deltaHeader.checkVersion(); - // Verify the file size. - long remaining = file.length() - file.getFilePointer(); - long missing = (appliedDeltaLength + transformedDeltaLength) - remaining; - if (missing > 0) { - throw new IOException("File is corrupted, missing " + missing + " bytes"); - } - return deltaHeader; - } - - /** - * Write a header to the current location in the file - */ - private void writeDeltaHeader(DeltaHeader header) throws IOException { - file.writeInt(header.protoVersion); - file.writeInt(header.appliedDeltaLength); - file.writeInt(header.transformedDeltaLength); - } - - /** - * Read the applied delta at the current file position. After method call, - * file position is directly after applied delta field. - */ - private ByteStringMessage<ProtocolAppliedWaveletDelta> readAppliedDelta(int length) - throws IOException { - if (length == 0) { - return null; - } - - byte[] bytes = new byte[length]; - file.readFully(bytes); - try { - return ByteStringMessage.parseProtocolAppliedWaveletDelta(ByteString.copyFrom(bytes)); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - } - - /** - * Write an applied delta to the current position in the file. Returns number of bytes written. - */ - private int writeAppliedDelta(ByteStringMessage<ProtocolAppliedWaveletDelta> delta) - throws IOException { - if (delta != null) { - byte[] bytes = delta.getByteArray(); - file.write(bytes); - return bytes.length; - } else { - return 0; - } - } - - /** - * Read a {@link TransformedWaveletDelta} from the current location in the file. - */ - private TransformedWaveletDelta readTransformedWaveletDelta(int transformedDeltaLength) - throws IOException { - if(transformedDeltaLength < 0) { - throw new IOException("Invalid delta length"); - } - - byte[] bytes = new byte[transformedDeltaLength]; - file.readFully(bytes); - ProtoTransformedWaveletDelta delta; - try { - delta = ProtoTransformedWaveletDelta.parseFrom(bytes); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - return ProtoDeltaStoreDataSerializer.deserialize(delta); - } - - /** - * Write a {@link TransformedWaveletDelta} to the file at the current location. - * @return length of written data - */ - private int writeTransformedWaveletDelta(TransformedWaveletDelta delta) throws IOException { - long startingPosition = file.getFilePointer(); - ProtoTransformedWaveletDelta protoDelta = ProtoDeltaStoreDataSerializer.serialize(delta); - OutputStream stream = Channels.newOutputStream(file.getChannel()); - protoDelta.writeTo(stream); - return (int) (file.getFilePointer() - startingPosition); - } - - /** - * Read a delta to the file. Does not move the file pointer before writing. Returns number of - * bytes written. - */ - private long writeDelta(WaveletDeltaRecord delta) throws IOException { - // We'll write zeros in place of the header and come back & write it at the end. - long headerPointer = file.getFilePointer(); - file.write(new byte[DeltaHeader.HEADER_LENGTH]); - - int appliedLength = writeAppliedDelta(delta.getAppliedDelta()); - int transformedLength = writeTransformedWaveletDelta(delta.getTransformedDelta()); - - long endPointer = file.getFilePointer(); - file.seek(headerPointer); - writeDeltaHeader(new DeltaHeader(DELTA_PROTOCOL_VERSION, appliedLength, transformedLength)); - file.seek(endPointer); - - return endPointer - headerPointer; - } - - /** - * Reads the last complete record in the deltas file and truncates any trailing junk. - */ - private void initializeEndVersionAndTruncateTrailingJunk() throws IOException { - long numRecords = index.length(); - if (numRecords >= 1) { - endVersion = getDeltaByEndVersion(numRecords).getResultingVersion(); - } else { - endVersion = null; - } - // The file's position should be at the end. Truncate any - // trailing junk such as from a partially completed write. - file.setLength(file.getFilePointer()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java b/src/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java deleted file mode 100644 index 2677391..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.box.stat.Timed; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * A flat file based implementation of DeltaStore. - * - * The delta store lives at some base directory. The directory structure looks like this: - * base/encoded-wave-id/encoded-wavelet-id.delta - * base/encoded-wave-id/encoded-wavelet-id.index - * - * See design doc: - * https://sites.google.com/a/waveprotocol.org/wave-protocol/protocol/design-proposals/wave-store-design-for-wave-in-a-box - * - - * @author [email protected] (Joseph Gentle) - */ -public class FileDeltaStore implements DeltaStore { - private static final Log LOG = Log.get(FileDeltaStore.class); - /** - * The directory in which the wavelets are stored - */ - final private String basePath; - - @Inject - public FileDeltaStore(Config config) { - this.basePath = config.getString("core.delta_store_directory"); - } - - @Timed - @Override - public FileDeltaCollection open(WaveletName waveletName) throws PersistenceException { - try { - return FileDeltaCollection.open(waveletName, basePath); - } catch (IOException e) { - throw new PersistenceException("Failed to open deltas for wavelet " + waveletName, e); - } - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException { - FileDeltaCollection.delete(waveletName, basePath); - } - - @Timed - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) throws PersistenceException { - String waveDirectory = FileUtils.waveIdToPathSegment(waveId); - File waveDir = new File(basePath, waveDirectory); - if (!waveDir.exists()) { - return ImmutableSet.of(); - } - - File[] deltaFiles = waveDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(FileDeltaCollection.DELTAS_FILE_SUFFIX); - } - }); - - ImmutableSet.Builder<WaveletId> results = ImmutableSet.builder(); - for(File deltaFile : deltaFiles) { - String name = deltaFile.getName(); - String encodedWaveletId = - name.substring(0, name.lastIndexOf(FileDeltaCollection.DELTAS_FILE_SUFFIX)); - WaveletId waveletId = FileUtils.waveletIdFromPathSegment(encodedWaveletId); - FileDeltaCollection deltas = open(WaveletName.of(waveId, waveletId)); - HashedVersion endVersion = deltas.getEndVersion(); - if (endVersion != null && endVersion.getVersion() > 0) { - results.add(waveletId); - } - try { - deltas.close(); - } catch (IOException e) { - LOG.info("Failed to close deltas file " + name, e); - } - } - - return results.build(); - } - - @Timed - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() { - File baseDir = new File(basePath); - if (!baseDir.exists()) { - return ExceptionalIterator.Empty.create(); - } - - File[] waveDirs = baseDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - try { - FileUtils.waveIdFromPathSegment(name); - return true; - } catch (IllegalArgumentException e) { - return false; - } - } - }); - - final ImmutableSet.Builder<WaveId> results = ImmutableSet.builder(); - for(File waveDir : waveDirs) { - results.add(FileUtils.waveIdFromPathSegment(waveDir.getName())); - } - - return new ExceptionalIterator<WaveId, PersistenceException>() { - private final Iterator<WaveId> iterator = results.build().iterator(); - private boolean nextFetched = false; - private WaveId nextWaveId = null; - - private void fetchNext() throws PersistenceException { - while(!nextFetched) { - if (iterator.hasNext()) { - nextWaveId = iterator.next(); - if (!lookup(nextWaveId).isEmpty()) { - nextFetched = true; - } - } else { - nextFetched = true; - nextWaveId = null; - } - } - } - - @Override - public boolean hasNext() throws PersistenceException { - fetchNext(); - return nextWaveId != null; - } - - @Override - public WaveId next() throws PersistenceException { - fetchNext(); - if (nextWaveId == null) { - throw new NoSuchElementException(); - } else { - nextFetched = false; - return nextWaveId; - } - } - - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java b/src/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java deleted file mode 100644 index 8dd53ac..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.commons.codec.binary.Hex; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.SignerInfoStore; -import org.waveprotocol.wave.crypto.CertPathStore; -import org.waveprotocol.wave.crypto.DefaultCertPathStore; -import org.waveprotocol.wave.crypto.SignatureException; -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; - -/** - * A flat file based implementation of {@link SignerInfoStore} - * - * @author [email protected] (Tad Glines) - */ -public class FileSignerInfoStore implements SignerInfoStore { - private static final String SIGNER_FILE_EXTENSION = ".signer"; - private final String signerInfoStoreBasePath; - private final CertPathStore certPathStore = new DefaultCertPathStore(); - - private static final Log LOG = Log.get(FileSignerInfoStore.class); - - @Inject - public FileSignerInfoStore(Config config) { - this.signerInfoStoreBasePath = config.getString("core.signer_info_store_directory"); - } - - private String signerIdToFileName(byte[] id) { - return signerInfoStoreBasePath + File.separator + new String(Hex.encodeHex(id)) - + SIGNER_FILE_EXTENSION; - } - - @Override - public void initializeSignerInfoStore() throws PersistenceException { - FileUtils.performDirectoryChecks(signerInfoStoreBasePath, SIGNER_FILE_EXTENSION, - "signer info store", LOG); - } - - @Override - public SignerInfo getSignerInfo(byte[] signerId) throws SignatureException { - synchronized(certPathStore) { - SignerInfo signerInfo = certPathStore.getSignerInfo(signerId); - File signerFile = new File(signerIdToFileName(signerId)); - if (signerInfo == null) { - if (signerFile.exists()) { - FileInputStream file = null; - try { - file = new FileInputStream(signerFile); - ProtocolSignerInfo data = ProtocolSignerInfo.newBuilder().mergeFrom(file).build(); - signerInfo = new SignerInfo(data); - } catch (SignatureException | IOException e) { - throw new SignatureException("Failed to parse signer info from file: " - + signerFile.getAbsolutePath(), e); - } finally { - FileUtils.closeAndIgnoreException(file, signerFile, LOG); - } - } - } - return signerInfo; - } - } - - @Override - public void putSignerInfo(ProtocolSignerInfo protoSignerInfo) throws SignatureException { - synchronized(certPathStore) { - SignerInfo signerInfo = new SignerInfo(protoSignerInfo); - File signerFile = new File(signerIdToFileName(signerInfo.getSignerId())); - FileOutputStream file = null; - try { - file = new FileOutputStream(signerFile); - file.write(protoSignerInfo.toByteArray()); - file.flush(); - certPathStore.putSignerInfo(protoSignerInfo); - } catch (IOException e) { - throw new SignatureException("Failed to write signer info to file: " - + signerFile.getAbsolutePath(), e); - } finally { - FileUtils.closeAndIgnoreException(file, signerFile, LOG); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/file/FileUtils.java b/src/org/waveprotocol/box/server/persistence/file/FileUtils.java deleted file mode 100644 index 3f63824..0000000 --- a/src/org/waveprotocol/box/server/persistence/file/FileUtils.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.base.Preconditions; - -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.UnsupportedEncodingException; - -/** - * Utility methods for file stores. - * - * @author [email protected] (Joseph Gentle) - */ -public class FileUtils { - private static final String SEPARATOR = "_"; - - /** - * Converts an arbitrary string into a format that can be stored safely on the filesystem. - * - * @param str the string to encode - * @return the encoded string - */ - public static String toFilenameFriendlyString(String str) { - byte[] bytes; - try { - bytes = str.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen. - throw new IllegalStateException("UTF-8 not supported", e); - } - - return new String(Hex.encodeHex(bytes)); - } - - /** - * Decodes a string that was encoded using toFilenameFriendlyString. - * - * @param encoded the encoded string - * @return the decoded string - * @throws DecoderException the string's encoding is invalid - */ - public static String fromFilenameFriendlyString(String encoded) throws DecoderException { - byte[] bytes = Hex.decodeHex(encoded.toCharArray()); - try { - return new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen. - throw new IllegalStateException("UTF-8 not supported", e); - } - } - - /** Decode a path segment pair. Throws IllegalArgumentException if the encoding is invalid */ - private static Pair<String, String> decodePathSegmentPair(String pathSegment) { - String[] components = pathSegment.split(SEPARATOR); - Preconditions.checkArgument(components.length == 2, "WaveId path name invalid"); - try { - return new Pair<String, String>(fromFilenameFriendlyString(components[0]), - fromFilenameFriendlyString(components[1])); - } catch (DecoderException e) { - throw new IllegalArgumentException("Wave path component encoding invalid"); - } - } - - /** - * Creates a filename-friendly pathname for the given waveId. - * - * The format is DOMAIN + '_' + ID where both the domain and the id are encoded - * to a pathname friendly format. - * - * @param waveId the waveId to encode - * @return a path segment which corresponds to the waveId - */ - public static String waveIdToPathSegment(WaveId waveId) { - String domain = toFilenameFriendlyString(waveId.getDomain()); - String id = toFilenameFriendlyString(waveId.getId()); - return domain + SEPARATOR + id; - } - - /** - * Converts a path segment created using waveIdToPathSegment back to a wave id - * - * @param pathSegment - * @return the decoded WaveId - * @throws IllegalArgumentException the encoding on the path segment is invalid - */ - public static WaveId waveIdFromPathSegment(String pathSegment) { - Pair<String, String> segments = decodePathSegmentPair(pathSegment); - return WaveId.of(segments.first, segments.second); - } - - /** - * Creates a filename-friendly path segment for a waveId. - * - * The format is "domain_id", encoded in a pathname friendly format. - * @param waveletId - * @return the decoded WaveletId - */ - public static String waveletIdToPathSegment(WaveletId waveletId) { - String domain = toFilenameFriendlyString(waveletId.getDomain()); - String id = toFilenameFriendlyString(waveletId.getId()); - return domain + SEPARATOR + id; - } - - /** - * Converts a path segment created using waveIdToPathSegment back to a wave id. - * - * @param pathSegment - * @return the decoded waveletId - * @throws IllegalArgumentException the encoding on the path segment is invalid - */ - public static WaveletId waveletIdFromPathSegment(String pathSegment) { - Pair<String, String> segments = decodePathSegmentPair(pathSegment); - return WaveletId.of(segments.first, segments.second); - } - - /** - * Creates a filename-friendly path segment for a wavelet name. - * - * @return the filename-friendly path segment representing the wavelet - */ - public static String waveletNameToPathSegment(WaveletName waveletName) { - return waveIdToPathSegment(waveletName.waveId) - + File.separatorChar - + waveletIdToPathSegment(waveletName.waveletId); - } - - /** - * Get a file for random binary access. If the file doesn't exist, it will be created. - * - * Calls to write() will not flush automatically. Call file.getChannel().force(true) to force - * writes to flush to disk. - * - * @param fileRef the file to open - * @return an opened RandomAccessFile wrapping the requested file - * @throws IOException an error occurred opening or creating the file - */ - public static RandomAccessFile getOrCreateFile(File fileRef) throws IOException { - if (!fileRef.exists()) { - fileRef.getParentFile().mkdirs(); - fileRef.createNewFile(); - } - - RandomAccessFile file; - try { - file = new RandomAccessFile(fileRef, "rw"); - } catch (FileNotFoundException e) { - // This should never happen. - throw new IllegalStateException("Java said the file exists, but it can't open it", e); - } - - return file; - } - - /** Create and return a new temporary directory */ - public static File createTemporaryDirectory() throws IOException { - // We want a temporary directory. createTempFile will make a file with a - // good temporary path. Its a bit nasty, but we'll create the file, then - // delete it and create a directory with the same name. - - File dir = File.createTempFile("fedoneattachments", null); - - if (!dir.delete() || !dir.mkdir()) { - throw new IOException("Could not make temporary directory for attachment store: " - + dir); - } - - return dir.getAbsoluteFile(); - } - - /** - * Close the closeable and log, but ignore, any exception thrown. - */ - public static void closeAndIgnoreException(Closeable closeable, File file, Log LOG) { - if (closeable != null) { - try { - closeable.close(); - } catch (IOException e) { - // This should never happen in practice. But just in case... log it. - LOG.warning("Failed to close file: " + file.getAbsolutePath(), e); - } - } - } - - /** - * Create dir if it doesn't exist, and perform checks to make sure that the dir's contents are - * listable, that files can be created in the dir, and that files in the dir are readable. - */ - public static void performDirectoryChecks(String dir, final String extension, String dirType, - Log LOG) throws PersistenceException { - File baseDir = createDirIfNotExists(dir, dirType); - - // Make sure we can read files by trying to read one of the files. - File[] files = baseDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(extension); - } - }); - - if (files == null) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) does not appear to be readable!", dirType, dir)); - } - - /* - * If file list isn't empty, try opening the first file in the list to make sure it - * is readable. If the first file is readable, then it is likely that the rest will - * be readable as well. - */ - if (files.length > 0) { - try { - FileInputStream file = new FileInputStream(files[0]); - file.read(); - } catch (IOException e) { - throw new PersistenceException( - String.format( - "Failed to read '%s' in configured %s directory '%s'. " - + "The directory's contents do not appear to be readable.", - dirType, files[0].getName(), dir), - e); - } - } - - // Make sure the dir is writable. - try { - File tmp = File.createTempFile("tempInitialization", ".temp", baseDir); - FileOutputStream stream = new FileOutputStream(tmp); - stream.write(new byte[]{'H','e','l','l','o'}); - stream.close(); - tmp.delete(); - } catch (IOException e) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) does not appear to be writable!", dirType, dir), e); - } - } - - /** - * Creates a directory if it doesn't exist. - * - * @param dir the directory location. - * @param dirType the directory type description (only useful for logs). - * @return the File directory (it's created if doesn't exist). - * @throws PersistenceException if the directory doesn't exist and cannot be - * created or when the path described by <code>dir</code> is not a - * directory. - */ - public static File createDirIfNotExists(String dir, String dirType) throws PersistenceException { - File baseDir = new File(dir); - - // Make sure the dir exists. - if (!baseDir.exists()) { - // It doesn't so try and create it. - if (!baseDir.mkdirs()) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) doesn't exist and could not be created!", dirType, dir)); - } - } - - // Make sure the dir is a directory. - if (!baseDir.isDirectory()) { - throw new PersistenceException(String.format( - "Configured %s path (%s) isn't a directory!", dirType, dir)); - } - return baseDir; - } - - /** - * Opens a file and read the content to a string. - * - * @param fileName the file to read. - * @return a string with the content of the file. - * @throws IOException if the method failed to open the file for reading. - */ - public static String getStringFromFile(String fileName) throws IOException { - StringBuilder stringBuilder = new StringBuilder(); - File file = new File(fileName); - FileReader fileReader = new FileReader(file); - BufferedReader bufferedReader = new BufferedReader(fileReader); - - String line; - while ((line = bufferedReader.readLine()) != null) { - stringBuilder.append(line); - } - fileReader.close(); - - return stringBuilder.toString(); - } - - public static boolean isDirExistsAndNonEmpty(String dir) { - File baseDir = new File(dir); - if (!(baseDir.exists() && baseDir.isDirectory()) || baseDir.list().length == 0) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java b/src/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java deleted file mode 100644 index eaaab39..0000000 --- a/src/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.file.FileUtils; -import org.waveprotocol.box.server.waveserver.IndexException; - -import java.io.File; -import java.io.IOException; - -/** - * File system based {@link IndexDirectory}. - * - * @author A. Kaplanov - */ -public class FSIndexDirectory implements IndexDirectory { - - private Directory directory; - - @Inject - public FSIndexDirectory(Config config) { - String directoryName = config.getString("core.index_directory"); - if (directory == null) { - File file; - try { - file = FileUtils.createDirIfNotExists(directoryName, ""); - } catch (PersistenceException e) { - throw new IndexException("Cannot create index directory " + directoryName, e); - } - try { - directory = FSDirectory.open(file); - } catch (IOException e) { - throw new IndexException("Cannot open index directory " + directoryName, e); - } - } - } - - @Override - public Directory getDirectory() throws IndexException { - return directory; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java b/src/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java deleted file mode 100644 index 1ce1438..0000000 --- a/src/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import org.apache.lucene.store.Directory; -import org.waveprotocol.box.server.waveserver.IndexException; - -/** - * Provides the directory to store/access the index files. - * - * @author A. Kaplanov - */ -public interface IndexDirectory { - - Directory getDirectory() throws IndexException; -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java b/src/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java deleted file mode 100644 index 4e3d295..0000000 --- a/src/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMDirectory; -import org.waveprotocol.box.server.waveserver.IndexException; - -/** - * RAM based {@link IndexDirectory}. - * - * @author A. Kaplanov - */ -public class RAMIndexDirectory implements IndexDirectory { - - private final Directory directory = new RAMDirectory(); - - @Override - public Directory getDirectory() throws IndexException { - return directory; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java b/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java deleted file mode 100644 index da6a85e..0000000 --- a/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.memory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.waveserver.ByteStringMessage; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.util.Collection; -import java.util.Map; - -/** - * An in-memory implementation of DeltasAccess - * - * @author [email protected] (Joseph Gentle) - */ -public class MemoryDeltaCollection implements DeltasAccess { - private final Map<Long, WaveletDeltaRecord> deltas = Maps.newHashMap(); - private final Map<Long, WaveletDeltaRecord> endDeltas = Maps.newHashMap(); - private final WaveletName waveletName; - - private HashedVersion endVersion = null; - - public MemoryDeltaCollection(WaveletName waveletName) { - Preconditions.checkNotNull(waveletName); - this.waveletName = waveletName; - } - - @Override - public boolean isEmpty() { - return deltas.isEmpty(); - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - @Override - public HashedVersion getEndVersion() { - return endVersion; - } - - @Override - public WaveletDeltaRecord getDelta(long version) { - return deltas.get(version); - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) { - return endDeltas.get(version); - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws InvalidProtocolBufferException { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getAppliedAtVersion() : null; - } - - @Override - public HashedVersion getResultingVersion(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getTransformedDelta().getResultingVersion() : null; - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getAppliedDelta() : null; - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getTransformedDelta() : null; - } - - @Override - public void close() { - // Does nothing. - } - - @Override - public void append(Collection<WaveletDeltaRecord> newDeltas) { - for (WaveletDeltaRecord delta : newDeltas) { - // Before: ... | D | - // start end - // After: ... | D | D + 1 | - // start end - long startVersion = delta.getTransformedDelta().getAppliedAtVersion(); - Preconditions.checkState( - (startVersion == 0 && endVersion == null) || - (startVersion == endVersion.getVersion())); - deltas.put(startVersion, delta); - endVersion = delta.getTransformedDelta().getResultingVersion(); - endDeltas.put(endVersion.getVersion(), delta); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java b/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java deleted file mode 100644 index db1606a..0000000 --- a/src/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.memory; - -import com.google.common.collect.ImmutableSet; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.util.CollectionUtils; - -import java.util.Map; - -/** - * A simple in-memory implementation of DeltaStore. - * - * @author [email protected] (Joseph Gentle) - */ -public class MemoryDeltaStore implements DeltaStore { - /** - * The actual data. - * Note: We don't remove map entries in the top-level map when all wavelets in a wave are deleted. - * This is a very small memory leak that won't cause problems in practice. - */ - private final Map<WaveId, Map<WaveletId, MemoryDeltaCollection>> data = - CollectionUtils.newHashMap(); - - private Map<WaveletId, MemoryDeltaCollection> getOrCreateWaveData(WaveId waveId) { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveId); - if (waveData == null) { - waveData = CollectionUtils.newHashMap(); - data.put(waveId, waveData); - } - - return waveData; - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveletName.waveId); - if (waveData == null) { - throw new FileNotFoundPersistenceException("WaveletData unknown"); - } else { - if (waveData.remove(waveletName.waveletId) == null) { - // Nothing was removed. - throw new FileNotFoundPersistenceException("Nothing was removed"); - } - } - } - - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveId); - if (waveData == null) { - return ImmutableSet.of(); - } else { - ImmutableSet.Builder<WaveletId> builder = ImmutableSet.builder(); - for (MemoryDeltaCollection collection : waveData.values()) { - if (!collection.isEmpty()) { - builder.add(collection.getWaveletName().waveletId); - } - } - return builder.build(); - } - } - - @Override - public DeltasAccess open(WaveletName waveletName) { - Map<WaveletId, MemoryDeltaCollection> waveData = getOrCreateWaveData(waveletName.waveId); - - MemoryDeltaCollection collection = waveData.get(waveletName.waveletId); - if (collection == null) { - collection = new MemoryDeltaCollection(waveletName); - waveData.put(waveletName.waveletId, collection); - } - - return collection; - } - - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() { - ImmutableSet.Builder<WaveId> builder = ImmutableSet.builder(); - // Filter out empty waves - for (Map.Entry<WaveId, Map<WaveletId, MemoryDeltaCollection>> e : data.entrySet()) { - if (!e.getValue().isEmpty()) { - for (MemoryDeltaCollection collection : e.getValue().values()) { - if (!collection.isEmpty()) { - builder.add(e.getKey()); - break; - } - } - } - } - return ExceptionalIterator.FromIterator.create(builder.build().iterator()); - } -}
