SAMZA-1380: Create Utility Class for interacting with Azure Blob Storage PR 1: AzureClient + AzureConfig PR 2: LeaseBlobManager **PR 3: BlobUtils + JobModelBundle** (current PR)
Author: PawasChhokra <Jaimatadi1$> Author: PawasChhokra <pawas2...@gmail.com> Reviewers: Navina Ramesh <nav...@apache.org> Closes #257 from PawasChhokra/BlobUtils Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a9866d62 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a9866d62 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a9866d62 Branch: refs/heads/0.14.0 Commit: a9866d629278e156291354bb95985e20c625089c Parents: f5c5cb2 Author: Pawas Chhokra <pawas2...@gmail.com> Authored: Mon Aug 7 16:28:05 2017 -0700 Committer: navina <nav...@apache.org> Committed: Mon Aug 7 16:28:05 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/BlobUtils.java | 280 +++++++++++++++++++ .../java/org/apache/samza/JobModelBundle.java | 61 ++++ .../java/org/apache/samza/LeaseBlobManager.java | 1 - 3 files changed, 341 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a9866d62/samza-azure/src/main/java/org/apache/samza/BlobUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/BlobUtils.java new file mode 100644 index 0000000..a798384 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/BlobUtils.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudPageBlob; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Client side class that has reference to Azure blob storage. + * Used for writing and reading from the blob. + * Every write requires a valid lease ID. + */ +public class BlobUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class); + private static final long JOB_MODEL_BLOCK_SIZE = 1024000; + private static final long BARRIER_STATE_BLOCK_SIZE = 1024; + private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024; + private CloudBlobClient blobClient; + private CloudBlobContainer container; + private CloudPageBlob blob; + + /** + * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already. + * @param client Client handle for access to Azure Storage account. + * @param containerName Name of container inside which we want the blob to reside. + * @param blobName Name of the blob to be managed. + * @param length Length of the page blob. + * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid. + */ + public BlobUtils(AzureClient client, String containerName, String blobName, long length) { + this.blobClient = client.getBlobClient(); + try { + this.container = blobClient.getContainerReference(containerName); + container.createIfNotExists(); + this.blob = container.getPageBlobReference(blobName); + if (!blob.exists()) { + blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null); + } + } catch (URISyntaxException e) { + LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e); + throw new AzureException(e); + } catch (StorageException e) { + int httpStatusCode = e.getHttpStatusCode(); + if (httpStatusCode == HttpStatus.CONFLICT_409) { + LOG.info("The blob you're trying to create exists already.", e); + } else { + LOG.error("Azure Storage Exception!", e); + throw new AzureException(e); + } + } + } + + /** + * Writes the job model to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called by the leader. + * @param prevJM Previous job model version that the processor was operating on. + * @param currJM Current job model version that the processor is operating on. + * @param prevJMV Previous job model version that the processor was operating on. + * @param currJMV Current job model version that the processor is operating on. + * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) { + try { + if (leaseId == null) { + return false; + } + JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV); + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle); + byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel()); + return true; + } catch (StorageException | IOException e) { + LOG.error("JobModel publish failed for version = " + currJMV, e); + return false; + } + } + + /** + * Reads the current job model from the blob. + * @return The current job model published on the blob. Returns null when job model details not found on the blob. + * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. + * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public JobModel getJobModel() { + LOG.info("Reading the job model from blob."); + JobModelBundle jmBundle = getJobModelBundle(); + if (jmBundle == null) { + LOG.error("Job Model details don't exist on the blob."); + return null; + } + JobModel jm = jmBundle.getCurrJobModel(); + return jm; + } + + /** + * Reads the current job model version from the blob . + * @return Current job model version published on the blob. Returns null when job model details not found on the blob. + * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. + * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public String getJobModelVersion() { + LOG.info("Reading the job model version from blob."); + JobModelBundle jmBundle = getJobModelBundle(); + if (jmBundle == null) { + LOG.error("Job Model details don't exist on the blob."); + return null; + } + String jmVersion = jmBundle.getCurrJobModelVersion(); + return jmVersion; + } + + /** + * Writes the barrier state to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called only by the leader. + * @param state Barrier state to be published to the blob. + * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishBarrierState(String state, String leaseId) { + try { + if (leaseId == null) { + return false; + } + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state); + byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + + //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise. + blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded barrier state {} to blob", state); + return true; + } catch (StorageException | IOException e) { + LOG.error("Barrier state " + state + " publish failed", e); + return false; + } + } + + /** + * Reads the current barrier state from the blob. + * @return Barrier state published on the blob. + * @throws AzureException If an Azure storage service error occurred. + * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public String getBarrierState() { + LOG.info("Reading the barrier state from blob."); + byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read barrier state from blob.", e); + throw new AzureException(e); + } + String state; + try { + state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class); + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e); + throw new SamzaException(e); + } + return state; + } + + /** + * Writes the list of live processors in the system to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called only by the leader. + * @param processors List of live processors to be published on the blob. + * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishLiveProcessorList(List<String> processors, String leaseId) { + try { + if (leaseId == null) { + return false; + } + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors); + byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded list of live processors to blob."); + return true; + } catch (StorageException | IOException e) { + LOG.error("Processor list: " + processors + "publish failed", e); + return false; + } + } + + /** + * Reads the list of live processors published on the blob. + * @return String list of live processors. + * @throws AzureException If an Azure storage service error occurred. + * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public List<String> getLiveProcessorList() { + LOG.info("Read the the list of live processors from blob."); + byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e)); + throw new AzureException(e); + } + List<String> list; + try { + list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class); + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e)); + throw new SamzaException(e); + } + return list; + } + + public CloudBlobClient getBlobClient() { + return this.blobClient; + } + + public CloudBlobContainer getBlobContainer() { + return this.container; + } + + public CloudPageBlob getBlob() { + return this.blob; + } + + private JobModelBundle getJobModelBundle() { + byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read JobModel details from the blob.", e); + throw new AzureException(e); + } + try { + JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class); + return jmBundle; + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e); + throw new SamzaException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/a9866d62/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java new file mode 100644 index 0000000..3ff971f --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import org.apache.samza.job.model.JobModel; + + +/** + * Bundle class for current and previous - job model and job model version. + * Used for publishing updated data to the blob in one go. + */ +public class JobModelBundle { + + private JobModel prevJobModel; + private JobModel currJobModel; + private String prevJobModelVersion; + private String currJobModelVersion; + + public JobModelBundle() {} + + public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) { + prevJobModel = prevJM; + currJobModel = currJM; + prevJobModelVersion = prevJMV; + currJobModelVersion = currJMV; + } + + public JobModel getCurrJobModel() { + return currJobModel; + } + + public JobModel getPrevJobModel() { + return prevJobModel; + } + + public String getCurrJobModelVersion() { + return currJobModelVersion; + } + + public String getPrevJobModelVersion() { + return prevJobModelVersion; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/a9866d62/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java index 3d6b13b..5375662 100644 --- a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java +++ b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java @@ -95,5 +95,4 @@ public class LeaseBlobManager { return false; } } - } \ No newline at end of file