Chad has submitted this change and it was merged.
Change subject: Rewrite BlobStore support
......................................................................
Rewrite BlobStore support
Upstream is breaking everything. So we'll have to too. It's
actually much cleaner and less abstracted.
Begin tracking 1.4 in master for 0.7 release.
Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
---
M pom.xml
M src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
M src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
M
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
D
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
9 files changed, 233 insertions(+), 319 deletions(-)
Approvals:
Manybubbles: Looks good to me, approved
jenkins-bot: Verified
diff --git a/pom.xml b/pom.xml
index 4ced71e..8a26d9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <elasticsearch.version>1.3.2</elasticsearch.version>
+ <elasticsearch.version>1.4.0</elasticsearch.version>
</properties>
<dependencies>
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
b/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
index cb668fb..0ad94de 100644
--- a/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
+++ b/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
@@ -15,7 +15,7 @@
* Our base plugin stuff.
*/
public class SwiftRepositoryPlugin extends AbstractPlugin {
- // Elasticsearch settings
+ // Elasticsearch settings
private final Settings settings;
/**
@@ -45,9 +45,9 @@
/**
* Register our services, if needed.
*/
- @Override
- @SuppressWarnings("rawtypes")
- public Collection<Class<? extends LifecycleComponent>> services() {
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> services =
Lists.newArrayList();
if (settings.getAsBoolean("swift.repository.enabled", true)) {
services.add(SwiftService.class);
@@ -55,10 +55,10 @@
return services;
}
- /**
- * Load our repository module into the list, if enabled
- * @param repositoriesModule The repositories module to register
ourselves with
- */
+ /**
+ * Load our repository module into the list, if enabled
+ * @param repositoriesModule The repositories module to register ourselves
with
+ */
public void onModule(RepositoriesModule repositoriesModule) {
if (settings.getAsBoolean("swift.repository.enabled", true)) {
repositoriesModule.registerRepository(SwiftRepository.TYPE,
SwiftRepositoryModule.class);
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
index 58cdf64..eec617b 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
@@ -4,17 +4,17 @@
public class SwiftAccountFactory {
- public static Account createAccount(SwiftService swiftService, String
url, String username, String password, String tenantName, String authMethod) {
- if ("KEYSTONE".equals(authMethod.toUpperCase())) {
- return swiftService.swiftKeyStone(url, username,
password, tenantName);
- }
+ public static Account createAccount(SwiftService swiftService, String url,
String username, String password, String tenantName, String authMethod) {
+ if ("KEYSTONE".equals(authMethod.toUpperCase())) {
+ return swiftService.swiftKeyStone(url, username, password,
tenantName);
+ }
- if ("TEMPAUTH".equals(authMethod.toUpperCase())) {
- return swiftService.swiftTempAuth(url, username,
password);
- }
+ if ("TEMPAUTH".equals(authMethod.toUpperCase())) {
+ return swiftService.swiftTempAuth(url, username, password);
+ }
- return swiftService.swiftBasic(url, username, password);
+ return swiftService.swiftBasic(url, username, password);
- }
+ }
}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
index 8e04759..1d4ecd7 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
@@ -1,14 +1,10 @@
package org.wikimedia.elasticsearch.swift.repositories;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
@@ -21,95 +17,90 @@
* The blob store repository. A glorified settings wrapper.
*/
public class SwiftRepository extends BlobStoreRepository {
- // The internal "type" for Elasticsearch
- public final static String TYPE = "swift";
+ // The internal "type" for Elasticsearch
+ public final static String TYPE = "swift";
- // Our blob store instance
- private final SwiftBlobStore blobStore;
+ // Our blob store instance
+ private final SwiftBlobStore blobStore;
- // Base path for blobs
- private final BlobPath basePath;
+ // Base path for blobs
+ private final BlobPath basePath;
- // Chunk size.
- private final ByteSizeValue chunkSize;
+ // Chunk size.
+ private final ByteSizeValue chunkSize;
- // Are we compressing our snapshots?
- private final boolean compress;
+ // Are we compressing our snapshots?
+ private final boolean compress;
- /**
- * Constructs new BlobStoreRepository
- *
- * @param name
- * repository name
- * @param repositorySettings
- * repository settings
- * @param indexShardRepository
- * an instance of IndexShardRepository
- * @param swiftService
- * an instance of SwiftService
- */
- @Inject
- protected SwiftRepository(RepositoryName name, RepositorySettings
repositorySettings, IndexShardRepository indexShardRepository, SwiftService
swiftService) {
- super(name.getName(), repositorySettings, indexShardRepository);
+ /**
+ * Constructs new BlobStoreRepository
+ *
+ * @param name
+ * repository name
+ * @param repositorySettings
+ * repository settings
+ * @param indexShardRepository
+ * an instance of IndexShardRepository
+ * @param swiftService
+ * an instance of SwiftService
+ */
+ @Inject
+ protected SwiftRepository(RepositoryName name, RepositorySettings
repositorySettings, IndexShardRepository indexShardRepository, SwiftService
swiftService) {
+ super(name.getName(), repositorySettings, indexShardRepository);
- String url = repositorySettings.settings().get("swift_url");
- if (url == null) {
- throw new RepositoryException(name.name(), "No url
defined for swift repository");
- }
+ String url = repositorySettings.settings().get("swift_url");
+ if (url == null) {
+ throw new RepositoryException(name.name(), "No url defined for
swift repository");
+ }
- String container =
repositorySettings.settings().get("swift_container");
- if (container == null) {
- throw new RepositoryException(name.name(), "No
container defined for swift repository");
- }
+ String container =
repositorySettings.settings().get("swift_container");
+ if (container == null) {
+ throw new RepositoryException(name.name(), "No container defined
for swift repository");
+ }
- String username =
repositorySettings.settings().get("swift_username", "");
- String password =
repositorySettings.settings().get("swift_password", "");
- String tenantName =
repositorySettings.settings().get("swift_tenantname", "");
- String authMethod =
repositorySettings.settings().get("swift_authmethod", "");
+ String username = repositorySettings.settings().get("swift_username",
"");
+ String password = repositorySettings.settings().get("swift_password",
"");
+ String tenantName =
repositorySettings.settings().get("swift_tenantname", "");
+ String authMethod =
repositorySettings.settings().get("swift_authmethod", "");
+ Account account = SwiftAccountFactory.createAccount(swiftService, url,
username, password, tenantName, authMethod);
- int concurrentStreams =
repositorySettings.settings().getAsInt("concurrent_streams",
componentSettings.getAsInt("concurrent_streams", 5));
- ExecutorService concurrentStreamPool = EsExecutors.newScaling(
- 1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[swift_stream]"));
+ blobStore = new SwiftBlobStore(settings, account, container);
- Account account =
SwiftAccountFactory.createAccount(swiftService, url, username, password,
tenantName, authMethod);
+ this.chunkSize =
repositorySettings.settings().getAsBytesSize("chunk_size",
+ componentSettings.getAsBytesSize("chunk_size", new
ByteSizeValue(5, ByteSizeUnit.GB)));
+ this.compress = repositorySettings.settings().getAsBoolean("compress",
componentSettings.getAsBoolean("compress", false));
+ this.basePath = BlobPath.cleanPath();
+ }
- blobStore = new SwiftBlobStore(settings, account, container,
concurrentStreamPool);
+ /**
+ * Get the blob store
+ */
+ @Override
+ protected BlobStore blobStore() {
+ return blobStore;
+ }
- this.chunkSize =
repositorySettings.settings().getAsBytesSize("chunk_size",
- componentSettings.getAsBytesSize("chunk_size",
new ByteSizeValue(5, ByteSizeUnit.GB)));
- this.compress =
repositorySettings.settings().getAsBoolean("compress",
componentSettings.getAsBoolean("compress", false));
- this.basePath = BlobPath.cleanPath();
- }
+ /**
+ * Get the base blob path
+ */
+ @Override
+ protected BlobPath basePath() {
+ return basePath;
+ }
- /**
- * Get the blob store
- */
- @Override
- protected BlobStore blobStore() {
- return blobStore;
- }
+ /**
+ * Get the chunk size
+ */
+ @Override
+ protected ByteSizeValue chunkSize() {
+ return chunkSize;
+ }
- /**
- * Get the base blob path
- */
- @Override
- protected BlobPath basePath() {
- return basePath;
- }
-
- /**
- * Get the chunk size
- */
- @Override
- protected ByteSizeValue chunkSize() {
- return chunkSize;
- }
-
- /**
- * Are we compressing our snapshots?
- */
- @Override
- protected boolean isCompress() {
- return compress;
- }
+ /**
+ * Are we compressing our snapshots?
+ */
+ @Override
+ protected boolean isCompress() {
+ return compress;
+ }
}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
index dbb9e43..f382f4a 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
@@ -9,13 +9,6 @@
* Swift repository module. Binds us to things.
*/
public class SwiftRepositoryModule extends AbstractModule {
- /**
- * Constructor. Super boring.
- */
- public SwiftRepositoryModule() {
- super();
- }
-
/**
* Do the binding.
*/
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
index 6642344..cbcb90c 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
@@ -12,103 +12,103 @@
import org.javaswift.joss.model.Account;
public class SwiftService extends AbstractLifecycleComponent<SwiftService> {
- // The account we'll be connecting to Swift with
- private Account swiftUser;
+ // The account we'll be connecting to Swift with
+ private Account swiftUser;
- /**
- * Constructor
- *
- * @param settings
- * Settings for our repository. Injected.
- */
- @Inject
- public SwiftService(Settings settings) {
- super(settings);
- }
+ /**
+ * Constructor
+ *
+ * @param settings
+ * Settings for our repository. Injected.
+ */
+ @Inject
+ public SwiftService(Settings settings) {
+ super(settings);
+ }
- /**
- * Create a Swift account object and connect it to Swift
- *
- * @param url
- * The auth url (eg: localhost:8080/auth/v1.0/)
- * @param username
- * The username
- * @param password
- * The password
- */
- public synchronized Account swiftBasic(String url, String username,
String password) {
- if (swiftUser != null) {
- return swiftUser;
- }
+ /**
+ * Create a Swift account object and connect it to Swift
+ *
+ * @param url
+ * The auth url (eg: localhost:8080/auth/v1.0/)
+ * @param username
+ * The username
+ * @param password
+ * The password
+ */
+ public synchronized Account swiftBasic(String url, String username, String
password) {
+ if (swiftUser != null) {
+ return swiftUser;
+ }
- try {
- AccountConfig conf = getStandardConfig(url, username,
password, AuthenticationMethod.BASIC);
- swiftUser = new AccountFactory(conf).createAccount();
- } catch (CommandException ce) {
- throw new ElasticsearchIllegalArgumentException("Unable
to authenticate to Swift Basic " + url + "/" + username + "/" + password, ce);
- }
- return swiftUser;
- }
+ try {
+ AccountConfig conf = getStandardConfig(url, username, password,
AuthenticationMethod.BASIC);
+ swiftUser = new AccountFactory(conf).createAccount();
+ } catch (CommandException ce) {
+ throw new ElasticsearchIllegalArgumentException("Unable to
authenticate to Swift Basic " + url + "/" + username + "/" + password, ce);
+ }
+ return swiftUser;
+ }
- public synchronized Account swiftKeyStone(String url, String username,
String password, String tenantName) {
- if (swiftUser != null) {
- return swiftUser;
- }
+ public synchronized Account swiftKeyStone(String url, String username,
String password, String tenantName) {
+ if (swiftUser != null) {
+ return swiftUser;
+ }
- try {
- AccountConfig conf = getStandardConfig(url, username,
password, AuthenticationMethod.KEYSTONE);
- conf.setTenantName(tenantName);
- swiftUser = new AccountFactory(conf).createAccount();
- } catch (CommandException ce) {
- throw new ElasticsearchIllegalArgumentException(
- "Unable to authenticate to Swift
Keystone " + url + "/" + username + "/" + password + "/" + tenantName, ce);
- }
- return swiftUser;
- }
+ try {
+ AccountConfig conf = getStandardConfig(url, username, password,
AuthenticationMethod.KEYSTONE);
+ conf.setTenantName(tenantName);
+ swiftUser = new AccountFactory(conf).createAccount();
+ } catch (CommandException ce) {
+ throw new ElasticsearchIllegalArgumentException(
+ "Unable to authenticate to Swift Keystone " + url + "/" +
username + "/" + password + "/" + tenantName, ce);
+ }
+ return swiftUser;
+ }
- public synchronized Account swiftTempAuth(String url, String username,
String password) {
- if (swiftUser != null) {
- return swiftUser;
- }
+ public synchronized Account swiftTempAuth(String url, String username,
String password) {
+ if (swiftUser != null) {
+ return swiftUser;
+ }
- try {
- AccountConfig conf = getStandardConfig(url, username,
password, AuthenticationMethod.TEMPAUTH);
- swiftUser = new AccountFactory(conf).createAccount();
- } catch (CommandException ce) {
- throw new ElasticsearchIllegalArgumentException("Unable
to authenticate to Swift Temp", ce);
- }
- return swiftUser;
- }
+ try {
+ AccountConfig conf = getStandardConfig(url, username, password,
AuthenticationMethod.TEMPAUTH);
+ swiftUser = new AccountFactory(conf).createAccount();
+ } catch (CommandException ce) {
+ throw new ElasticsearchIllegalArgumentException("Unable to
authenticate to Swift Temp", ce);
+ }
+ return swiftUser;
+ }
- private AccountConfig getStandardConfig(String url, String username,
String password, AuthenticationMethod method) {
- AccountConfig conf = new AccountConfig();
- conf.setAuthUrl(url);
- conf.setUsername(username);
- conf.setPassword(password);
- conf.setAuthenticationMethod(method);
- conf.setAllowContainerCaching(false);
- conf.setAllowCaching(false);
- return conf;
- }
+ private AccountConfig getStandardConfig(String url, String username,
String password, AuthenticationMethod method) {
+ AccountConfig conf = new AccountConfig();
+ conf.setAuthUrl(url);
+ conf.setUsername(username);
+ conf.setPassword(password);
+ conf.setAuthenticationMethod(method);
+ conf.setAllowContainerCaching(false);
+ conf.setAllowCaching(false);
+ return conf;
+ }
- /**
- * Start the service. No-op here.
- */
- @Override
- protected void doStart() throws ElasticsearchException {
- }
+ /**
+ * Start the service. No-op here.
+ */
+ @Override
+ protected void doStart() throws ElasticsearchException {
+ }
- /**
- * Stop the service. No-op here.
- */
- @Override
- protected void doStop() throws ElasticsearchException {
- }
+ /**
+ * Stop the service. No-op here.
+ */
+ @Override
+ protected void doStop() throws ElasticsearchException {
+ }
- /**
- * Close the service. No-op here.
- */
- @Override
- protected void doClose() throws ElasticsearchException {
- }
+ /**
+ * Close the service. No-op here.
+ */
+ @Override
+ protected void doClose() throws ElasticsearchException {
+ }
}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
index b20e8c0..3171385 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
@@ -10,15 +10,19 @@
import org.javaswift.joss.model.DirectoryOrObject;
import org.javaswift.joss.model.StoredObject;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.util.Collection;
/**
* Swift's implementation of the AbstractBlobContainer
*/
public class SwiftBlobContainer extends AbstractBlobContainer {
- // Our local swift blob store instance
+ // Our local swift blob store instance
protected final SwiftBlobStore blobStore;
// The root path for blobs. Used by buildKey to build full blob names
@@ -44,46 +48,7 @@
*/
@Override
public boolean blobExists(String blobName) {
- return blobStore.swift().getObject(buildKey(blobName)).exists();
- }
-
- /**
- * Read a given blob into the listener
- * @param blobName The blob name to read
- * @param listener The listener to report our read info back to
- */
- @Override
- public void readBlob(final String blobName, final ReadBlobListener
listener) {
- blobStore.executor().execute(new Runnable() {
- @Override
- public void run() {
- InputStream is;
- try {
- // This is the interesting bit. Fetch the blob and then
turn it
- // into an InputStream for reading below
- StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
- is = object.downloadObjectAsInputStream();
- } catch (Exception e) {
- listener.onFailure(e);
- return;
- }
- byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
- try {
- int bytesRead;
- while ((bytesRead = is.read(buffer)) != -1) {
- listener.onPartial(buffer, 0, bytesRead);
- }
- listener.onCompleted();
- } catch (Exception e) {
- try {
- is.close();
- } catch (IOException e1) {
- // ignore
- }
- listener.onFailure(e);
- }
- }
- });
+ return blobStore.swift().getObject(buildKey(blobName)).exists();
}
/**
@@ -92,10 +57,10 @@
*/
@Override
public boolean deleteBlob(String blobName) throws IOException {
- StoredObject object = blobStore.swift().getObject(buildKey(blobName));
- if (object.exists()) {
- object.delete();
- }
+ StoredObject object = blobStore.swift().getObject(buildKey(blobName));
+ if (object.exists()) {
+ object.delete();
+ }
return true;
}
@@ -140,4 +105,44 @@
protected String buildKey(String blobName) {
return keyPath + blobName;
}
+
+ /**
+ * Fetch a given blob into a BufferedInputStream
+ * @param blobName The blob name to read
+ */
+ @Override
+ public InputStream openInput(String blobName) throws IOException {
+ return new BufferedInputStream(
+
blobStore.swift().getObject(buildKey(blobName)).downloadObjectAsInputStream(),
+ blobStore.bufferSizeInBytes());
+ }
+
+ @Override
+ public OutputStream createOutput(final String blobName) throws IOException
{
+ // need to remove old file if already exist
+ deleteBlob(blobName);
+
+ final PipedInputStream in = new PipedInputStream();
+
+ // We'll need to store this thread and make sure it terminates when the
output stream is closed.
+ final Thread transport = new Thread(new Runnable(){
+ public void run(){
+ blobStore.swift().getObject(buildKey(blobName)).uploadObject(in);
+ }
+ });
+ transport.start();
+
+ return new PipedOutputStream(in) {
+ @Override
+ public void close() throws IOException {
+ try {
+ // Close output, close the thread
+ super.close();
+ transport.join();
+ } catch(InterruptedException e) {
+ throw new IOException("Swift input/output shenanigans.", e);
+ }
+ }
+ };
+ }
}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
index 40dacb0..61877e6 100644
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
+++
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
@@ -1,8 +1,8 @@
package org.wikimedia.elasticsearch.swift.repositories.blobstore;
+import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -11,15 +11,10 @@
import org.javaswift.joss.model.Container;
import org.javaswift.joss.model.StoredObject;
-import java.util.concurrent.Executor;
-
/**
* Our blob store
*/
public class SwiftBlobStore extends AbstractComponent implements BlobStore {
- // Executor for our operations. Sorta like a dedicated Thread pool.
- private final Executor executor;
-
// How much to buffer our blobs by
private final int bufferSizeInBytes;
@@ -31,11 +26,9 @@
* @param settings Settings for our repository. Only care about buffer
size.
* @param auth
* @param container
- * @param executor
*/
- public SwiftBlobStore(Settings settings, Account auth, String container,
Executor executor) {
+ public SwiftBlobStore(Settings settings, Account auth, String container) {
super(settings);
- this.executor = executor;
this.bufferSizeInBytes = (int)settings.getAsBytesSize("buffer_size",
new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
swift = auth.getContainer(container);
@@ -53,13 +46,6 @@
}
/**
- * Get the executor
- */
- public Executor executor() {
- return executor;
- }
-
- /**
* Get our buffer size
*/
public int bufferSizeInBytes() {
@@ -71,8 +57,8 @@
* @param path The blob path to search
*/
@Override
- public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
- return new SwiftImmutableBlobContainer(path, this);
+ public BlobContainer blobContainer(BlobPath path) {
+ return new SwiftBlobContainer(path, this);
}
/**
@@ -81,7 +67,7 @@
*/
@Override
public void delete(BlobPath path) {
- String keyPath = path.buildAsString("/");
+ String keyPath = path.buildAsString("/");
if (!keyPath.isEmpty()) {
keyPath = keyPath + "/";
}
diff --git
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
deleted file mode 100644
index 8ea65a4..0000000
---
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.blobstore.support.BlobStores;
-import org.javaswift.joss.model.StoredObject;
-
-/**
- * The final implementation of Swift blob storage. Implements the write
functionality
- */
-public class SwiftImmutableBlobContainer extends SwiftBlobContainer implements
ImmutableBlobContainer {
- /**
- * Constructor. Just call the parent.
- * @param path The BlobPath to find blobs in
- * @param blobStore The blob store to use for operations
- */
- protected SwiftImmutableBlobContainer(BlobPath path, SwiftBlobStore
blobStore) {
- super(path, blobStore);
- }
-
- /**
- * Write a blob!
- * @param blobName The name of the blob we're writing
- * @param is The data we're writing
- * @param sizeInBytes How big the data is. We don't care with Swift.
- * @param listener The listener for write results (failure, etc)
- */
- @Override
- public void writeBlob(final String blobName, final InputStream is, final
long sizeInBytes, final WriterListener listener) {
- blobStore.executor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- // need to remove old file if already exist
- StoredObject object =
blobStore.swift().getObject(buildKey(blobName));
- if (object.exists()) {
- object.delete();
- }
- object.uploadObject(is);
- listener.onCompleted();
- } catch (Exception e) {
- listener.onFailure(e);
- }
- }
- });
- }
-
- /**
- * Write a blob! Basically boilerplate from other implementations.
- * @param blobName The name of the blob we're writing
- * @param is The data we're writing
- * @param sizeInBytes How big the data is. We don't care with Swift.
- */
- @Override
- public void writeBlob(String blobName, InputStream is, long sizeInBytes)
throws IOException {
- BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
- }
-}
--
To view, visit https://gerrit.wikimedia.org/r/158279
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
Gerrit-PatchSet: 11
Gerrit-Project: search/repository-swift
Gerrit-Branch: master
Gerrit-Owner: Chad <[email protected]>
Gerrit-Reviewer: BearND <[email protected]>
Gerrit-Reviewer: Chad <[email protected]>
Gerrit-Reviewer: Manybubbles <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits