SAMZA-1376: Create a leasing utility class for blobs in Azure navina PR 1: AzureClient + AzureConfig **PR 2: LeaseBlobManager** (current PR)
Author: PawasChhokra <Jaimatadi1$> Author: PawasChhokra <pawas2...@gmail.com> Reviewers: Navina Ramesh <nav...@apache.org>, Shanthoosh Venkataraman <svenkatara...@linkedin.com> Closes #256 from PawasChhokra/LeaseUtils Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f5c5cb22 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f5c5cb22 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f5c5cb22 Branch: refs/heads/0.14.0 Commit: f5c5cb2233f6e141c34ea372a49a7b07f163a359 Parents: cf5efe7 Author: Pawas Chhokra <pawas2...@gmail.com> Authored: Mon Aug 7 14:15:23 2017 -0700 Committer: navina <nav...@apache.org> Committed: Mon Aug 7 14:15:23 2017 -0700 ---------------------------------------------------------------------- .gitignore | 2 +- .../main/java/org/apache/samza/AzureClient.java | 16 +++- .../main/java/org/apache/samza/AzureConfig.java | 10 +- .../java/org/apache/samza/AzureException.java | 43 +++++++++ .../java/org/apache/samza/LeaseBlobManager.java | 99 ++++++++++++++++++++ 5 files changed, 160 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index d4dcaa1..7cbffe7 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,4 @@ docs/learn/documentation/*/rest/javadocs out/ *.patch **.pyc -samza-shell/src/main/visualizer/plan.json +samza-shell/src/main/visualizer/plan.json \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureClient.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java index b5884cd..7c12055 100644 --- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java +++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java @@ -38,19 +38,25 @@ public class AzureClient { private final CloudTableClient tableClient; private final CloudBlobClient blobClient; + /** + * Creates a reference to the Azure Storage account according to the connection string that the client passes. + * Also creates references to Azure Blob Storage and Azure Table Storage. + * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>" + * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid. + */ AzureClient(String storageConnectionString) { try { account = CloudStorageAccount.parse(storageConnectionString); blobClient = account.createCloudBlobClient(); tableClient = account.createCloudTableClient(); } catch (IllegalArgumentException | URISyntaxException e) { - LOG.error("\nConnection string {} specifies an invalid URI.", storageConnectionString); + LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString); LOG.error("Please confirm the connection string is in the Azure connection string format."); - throw new SamzaException(e); + throw new AzureException(e); } catch (InvalidKeyException e) { - LOG.error("\nConnection string {} specifies an invalid key.", storageConnectionString); + LOG.error("Connection string {} specifies an invalid key.", storageConnectionString); LOG.error("Please confirm the AccountName and AccountKey in the connection string are valid."); - throw new SamzaException(e); + throw new AzureException(e); } } @@ -61,4 +67,4 @@ public class AzureClient { public CloudTableClient getTableClient() { return tableClient; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java index b88d3c0..32b8082 100644 --- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java @@ -25,16 +25,19 @@ import org.apache.samza.config.ConfigException; import org.apache.samza.config.MapConfig; +/** + * Config class for reading all user defined parameters for Azure driven coordination services. + */ public class AzureConfig extends MapConfig { // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>" public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect"; public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length"; + public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000; private static String containerName; private static String blobName; private static String tableName; - public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000; public AzureConfig(Config config) { super(config); @@ -60,6 +63,7 @@ public class AzureConfig extends MapConfig { public String getAzureBlobName() { return blobName; } + public long getAzureBlobLength() { return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH); } @@ -67,6 +71,4 @@ public class AzureConfig extends MapConfig { public String getAzureTableName() { return tableName; } - -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureException.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureException.java b/samza-azure/src/main/java/org/apache/samza/AzureException.java new file mode 100644 index 0000000..12da984 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/AzureException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +/** + * Unchecked exception that Azure throws when something goes wrong . + */ +public class AzureException extends RuntimeException { + + public AzureException() { + super(); + } + + public AzureException(String s, Throwable t) { + super(s, t); + } + + public AzureException(String s) { + super(s); + } + + public AzureException(Throwable t) { + super(t); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java new file mode 100644 index 0000000..3d6b13b --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudPageBlob; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper class for lease blob operations. + */ +public class LeaseBlobManager { + + private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class); + private CloudPageBlob leaseBlob; + + public LeaseBlobManager(CloudPageBlob leaseBlob) { + this.leaseBlob = leaseBlob; + } + + /** + * Acquires a lease on a blob. The lease ID is NULL initially. + * @param leaseTimeInSec The time in seconds you want to acquire the lease for. + * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed. + * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already. + * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist. + */ + public String acquireLease(int leaseTimeInSec, String leaseId) { + try { + String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId); + LOG.info("Acquired lease with lease id = " + id); + return id; + } catch (StorageException storageException) { + int httpStatusCode = storageException.getHttpStatusCode(); + if (httpStatusCode == HttpStatus.CONFLICT_409) { + LOG.info("The blob you're trying to acquire is leased already.", storageException); + } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) { + LOG.error("The blob you're trying to lease does not exist.", storageException); + throw new AzureException(storageException); + } else { + LOG.error("Error acquiring lease!", storageException); + throw new AzureException(storageException); + } + } + return null; + } + + /** + * Renews the lease on the blob. + * @param leaseId ID of the lease to be renewed. + * @return True if lease was renewed successfully, false otherwise. + */ + public boolean renewLease(String leaseId) { + try { + leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + return true; + } catch (StorageException storageException) { + LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException); + return false; + } + } + + /** + * Releases the lease on the blob. + * @param leaseId ID of the lease to be released. + * @return True if released successfully, false otherwise. + */ + public boolean releaseLease(String leaseId) { + try { + leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); + return true; + } catch (StorageException storageException) { + LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException); + return false; + } + } + +} \ No newline at end of file