Chad has uploaded a new change for review.
https://gerrit.wikimedia.org/r/158279
Change subject: WIP: Rewrite BlobStore support
......................................................................
WIP: Rewrite BlobStore support
Upstream is breaking everything. So we'll have to too.
Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
---
M pom.xml
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
D
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
A
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
D
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
7 files changed, 155 insertions(+), 248 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/repository-swift
refs/changes/79/158279/1
diff --git a/pom.xml b/pom.xml
index 4ced71e..e547bd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <elasticsearch.version>1.3.2</elasticsearch.version>
+ <elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
</properties>
<dependencies>
@@ -49,7 +49,7 @@
<dependency>
<groupId>org.javaswift</groupId>
<artifactId>joss</artifactId>
- <version>0.9.8</version>
+ <version>0.9.9-SNAPSHOT</version>
</dependency>
<!-- testing only -->
<dependency>
@@ -113,11 +113,6 @@
<source>1.7</source>
<target>1.7</target>
</configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
index 8e04759..691a3b7 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
@@ -1,14 +1,10 @@
package org.wikimedia.elasticsearch.swift.repositories;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
@@ -66,14 +62,9 @@
String password =
repositorySettings.settings().get("swift_password", "");
String tenantName =
repositorySettings.settings().get("swift_tenantname", "");
String authMethod =
repositorySettings.settings().get("swift_authmethod", "");
-
- int concurrentStreams =
repositorySettings.settings().getAsInt("concurrent_streams",
componentSettings.getAsInt("concurrent_streams", 5));
- ExecutorService concurrentStreamPool = EsExecutors.newScaling(
- 1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[swift_stream]"));
-
Account account =
SwiftAccountFactory.createAccount(swiftService, url, username, password,
tenantName, authMethod);
- blobStore = new SwiftBlobStore(settings, account, container,
concurrentStreamPool);
+ blobStore = new SwiftBlobStore(settings, account, container);
this.chunkSize =
repositorySettings.settings().getAsBytesSize("chunk_size",
componentSettings.getAsBytesSize("chunk_size",
new ByteSizeValue(5, ByteSizeUnit.GB)));
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
index dbb9e43..f382f4a 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
@@ -9,13 +9,6 @@
* Swift repository module. Binds us to things.
*/
public class SwiftRepositoryModule extends AbstractModule {
- /**
- * Constructor. Super boring.
- */
- public SwiftRepositoryModule() {
- super();
- }
-
/**
* Do the binding.
*/
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
deleted file mode 100644
index 5db6376..0000000
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
-import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
-import org.elasticsearch.common.collect.ImmutableMap;
-import org.javaswift.joss.model.Directory;
-import org.javaswift.joss.model.DirectoryOrObject;
-import org.javaswift.joss.model.StoredObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-
-/**
- * Swift's implementation of the AbstractBlobContainer
- */
-public class AbstractSwiftBlobContainer extends AbstractBlobContainer {
- // Our local swift blob store instance
- protected final SwiftBlobStore blobStore;
-
- // The root path for blobs. Used by buildKey to build full blob names
- protected final String keyPath;
-
- /**
- * Constructor
- * @param path The BlobPath to find blobs in
- * @param blobStore The blob store to use for operations
- */
- protected AbstractSwiftBlobContainer(BlobPath path, SwiftBlobStore
blobStore) {
- super(path);
- this.blobStore = blobStore;
- String keyPath = path.buildAsString("/");
- if (!keyPath.isEmpty()) {
- keyPath = keyPath + "/";
- }
- this.keyPath = keyPath;
- }
-
- /**
- * Does a blob exist? Self-explanatory.
- */
- @Override
- public boolean blobExists(String blobName) {
- return blobStore.swift().getObject(buildKey(blobName)).exists();
- }
-
- /**
- * Read a given blob into the listener
- * @param blobName The blob name to read
- * @param listener The listener to report our read info back to
- */
- @Override
- public void readBlob(final String blobName, final ReadBlobListener
listener) {
- blobStore.executor().execute(new Runnable() {
- @Override
- public void run() {
- InputStream is;
- try {
- // This is the interesting bit. Fetch the blob and then
turn it
- // into an InputStream for reading below
- StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
- is = object.downloadObjectAsInputStream();
- } catch (Exception e) {
- listener.onFailure(e);
- return;
- }
- byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
- try {
- int bytesRead;
- while ((bytesRead = is.read(buffer)) != -1) {
- listener.onPartial(buffer, 0, bytesRead);
- }
- listener.onCompleted();
- } catch (Exception e) {
- try {
- is.close();
- } catch (IOException e1) {
- // ignore
- }
- listener.onFailure(e);
- }
- }
- });
- }
-
- /**
- * Delete a blob. Straightforward.
- * @param blobName A blob to delete
- */
- @Override
- public boolean deleteBlob(String blobName) throws IOException {
- StoredObject object = blobStore.swift().getObject(buildKey(blobName));
- if (object.exists()) {
- object.delete();
- }
- return true;
- }
-
- /**
- * Get the blobs matching a given prefix
- * @param blobNamePrefix The prefix to look for blobs with
- */
- @Override
- public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable
String blobNamePrefix) throws IOException {
- ImmutableMap.Builder<String, BlobMetaData> blobsBuilder =
ImmutableMap.builder();
-
- Collection<DirectoryOrObject> files;
- if (blobNamePrefix != null) {
- files = blobStore.swift().listDirectory(new
Directory(buildKey(blobNamePrefix), '/'));
- } else {
- files = blobStore.swift().listDirectory(new Directory(keyPath,
'/'));
- }
- if (files != null && !files.isEmpty()) {
- for (DirectoryOrObject object : files) {
- if (object.isObject()) {
- String name = object.getName().substring(keyPath.length());
- blobsBuilder.put(name, new PlainBlobMetaData(name,
object.getAsObject().getContentLength()));
- }
- }
- }
-
- return blobsBuilder.build();
- }
-
- /**
- * Get all the blobs
- */
- @Override
- public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
- return listBlobsByPrefix(null);
- }
-
- /**
- * Build a key for a blob name, based on the keyPath
- * @param blobName The blob name to build a key for
- */
- protected String buildKey(String blobName) {
- return keyPath + blobName;
- }
-}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
new file mode 100644
index 0000000..b4f57dd
--- /dev/null
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
@@ -0,0 +1,146 @@
+package org.wikimedia.elasticsearch.swift.repositories.blobstore;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
+import org.elasticsearch.common.collect.ImmutableMap;
+import org.javaswift.joss.model.Directory;
+import org.javaswift.joss.model.DirectoryOrObject;
+import org.javaswift.joss.model.StoredObject;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+
+/**
+ * Swift's implementation of the AbstractBlobContainer
+ */
+public class SwiftBlobContainer extends AbstractBlobContainer {
+ // Our local swift blob store instance
+ protected final SwiftBlobStore blobStore;
+
+ // The root path for blobs. Used by buildKey to build full blob names
+ protected final String keyPath;
+
+ /**
+ * Constructor
+ *
+ * @param path
+ * The BlobPath to find blobs in
+ * @param blobStore
+ * The blob store to use for operations
+ */
+ protected SwiftBlobContainer(BlobPath path, SwiftBlobStore blobStore) {
+ super(path);
+ this.blobStore = blobStore;
+ String keyPath = path.buildAsString("/");
+ if (!keyPath.isEmpty()) {
+ keyPath = keyPath + "/";
+ }
+ this.keyPath = keyPath;
+ }
+
+ /**
+ * Does a blob exist? Self-explanatory.
+ */
+ @Override
+ public boolean blobExists(String blobName) {
+ return blobStore.swift().getObject(buildKey(blobName)).exists();
+ }
+
+ /**
+ * Delete a blob. Straightforward.
+ *
+ * @param blobName
+ * A blob to delete
+ */
+ @Override
+ public boolean deleteBlob(String blobName) throws IOException {
+ StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
+ if (object.exists()) {
+ object.delete();
+ }
+ return true;
+ }
+
+ /**
+ * Get the blobs matching a given prefix
+ *
+ * @param blobNamePrefix
+ * The prefix to look for blobs with
+ */
+ @Override
+ public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(
+ @Nullable String blobNamePrefix) throws IOException {
+ ImmutableMap.Builder<String, BlobMetaData> blobsBuilder =
ImmutableMap
+ .builder();
+
+ Collection<DirectoryOrObject> files;
+ if (blobNamePrefix != null) {
+ files = blobStore.swift().listDirectory(
+ new Directory(buildKey(blobNamePrefix),
'/'));
+ } else {
+ files = blobStore.swift()
+ .listDirectory(new Directory(keyPath,
'/'));
+ }
+ if (files != null && !files.isEmpty()) {
+ for (DirectoryOrObject object : files) {
+ if (object.isObject()) {
+ String name =
object.getName().substring(keyPath.length());
+ blobsBuilder.put(name, new
PlainBlobMetaData(name, object
+
.getAsObject().getContentLength()));
+ }
+ }
+ }
+
+ return blobsBuilder.build();
+ }
+
+ /**
+ * Get all the blobs
+ */
+ @Override
+ public ImmutableMap<String, BlobMetaData> listBlobs() throws
IOException {
+ return listBlobsByPrefix(null);
+ }
+
+ /**
+ * Build a key for a blob name, based on the keyPath
+ *
+ * @param blobName
+ * The blob name to build a key for
+ */
+ protected String buildKey(String blobName) {
+ return keyPath + blobName;
+ }
+
+ /**
+ * Fetch a given blob into a BufferedInputStream
+ *
+ * @param blobName
+ * The blob name to read
+ */
+ @Override
+ public InputStream openInput(String blobName) throws IOException {
+ return new BufferedInputStream(
+
blobStore.swift().getObject(buildKey(blobName)).downloadObjectAsInputStream(),
+ blobStore.bufferSizeInBytes());
+ }
+
+ @Override
+ public OutputStream createOutput(String blobName) throws IOException {
+ final StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
+ return new BufferedOutputStream(new FilterOutputStream(null) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+ }, blobStore.bufferSizeInBytes());
+ }
+}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
index 40dacb0..1eb4be5 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
@@ -1,8 +1,8 @@
package org.wikimedia.elasticsearch.swift.repositories.blobstore;
+import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -11,15 +11,10 @@
import org.javaswift.joss.model.Container;
import org.javaswift.joss.model.StoredObject;
-import java.util.concurrent.Executor;
-
/**
* Our blob store
*/
public class SwiftBlobStore extends AbstractComponent implements BlobStore {
- // Executor for our operations. Sorta like a dedicated Thread pool.
- private final Executor executor;
-
// How much to buffer our blobs by
private final int bufferSizeInBytes;
@@ -31,11 +26,9 @@
* @param settings Settings for our repository. Only care about buffer
size.
* @param auth
* @param container
- * @param executor
*/
- public SwiftBlobStore(Settings settings, Account auth, String container,
Executor executor) {
+ public SwiftBlobStore(Settings settings, Account auth, String container) {
super(settings);
- this.executor = executor;
this.bufferSizeInBytes = (int)settings.getAsBytesSize("buffer_size",
new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
swift = auth.getContainer(container);
@@ -53,13 +46,6 @@
}
/**
- * Get the executor
- */
- public Executor executor() {
- return executor;
- }
-
- /**
* Get our buffer size
*/
public int bufferSizeInBytes() {
@@ -70,10 +56,10 @@
* Factory for getting blob containers for a path
* @param path The blob path to search
*/
- @Override
- public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
- return new SwiftImmutableBlobContainer(path, this);
- }
+ @Override
+ public BlobContainer blobContainer(BlobPath path) {
+ return new SwiftBlobContainer(path, this);
+ }
/**
* Delete an arbitrary BlobPath from our store.
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
deleted file mode 100644
index 6f69f6b..0000000
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.blobstore.support.BlobStores;
-import org.javaswift.joss.model.StoredObject;
-
-/**
- * The final implementation of Swift blob storage. Implements the write
functionality
- */
-public class SwiftImmutableBlobContainer extends AbstractSwiftBlobContainer
implements ImmutableBlobContainer {
- /**
- * Constructor. Just call the parent.
- * @param path The BlobPath to find blobs in
- * @param blobStore The blob store to use for operations
- */
- protected SwiftImmutableBlobContainer(BlobPath path, SwiftBlobStore
blobStore) {
- super(path, blobStore);
- }
-
- /**
- * Write a blob!
- * @param blobName The name of the blob we're writing
- * @param is The data we're writing
- * @param sizeInBytes How big the data is. We don't care with Swift.
- * @param listener The listener for write results (failure, etc)
- */
- @Override
- public void writeBlob(final String blobName, final InputStream is, final
long sizeInBytes, final WriterListener listener) {
- blobStore.executor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- // need to remove old file if already exist
- StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
- if (object.exists()) {
- object.delete();
- }
- object.uploadObject(is);
- listener.onCompleted();
- } catch (Exception e) {
- listener.onFailure(e);
- }
- }
- });
- }
-
- /**
- * Write a blob! Basically boilerplate from other implementations.
- * @param blobName The name of the blob we're writing
- * @param is The data we're writing
- * @param sizeInBytes How big the data is. We don't care with Swift.
- */
- @Override
- public void writeBlob(String blobName, InputStream is, long sizeInBytes)
throws IOException {
- BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
- }
-}
--
To view, visit https://gerrit.wikimedia.org/r/158279
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
Gerrit-PatchSet: 1
Gerrit-Project: search/repository-swift
Gerrit-Branch: master
Gerrit-Owner: Chad <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits