[GitHub] [nifi] turcsanyip commented on a diff in pull request #5916: NIFI-9846 Implement pagination listing for Azure List processors

2022-04-04 Thread GitBox


turcsanyip commented on code in PR #5916:
URL: https://github.com/apache/nifi/pull/5916#discussion_r841468038


##
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java:
##
@@ -199,34 +201,44 @@ protected boolean isListingResetNecessary(final 
PropertyDescriptor property) {
 }
 
 @Override
-protected List performListing(ProcessContext context, Long 
minTimestamp, ListingMode listingMode) throws IOException {
-String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+protected List performListing(final ProcessContext context, 
final Long minTimestamp, final ListingMode listingMode) throws IOException {
+final String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+final String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
 try {
-List listing = new ArrayList<>();
+final List listing = new ArrayList<>();
 
-BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
+final BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
 
-ListBlobsOptions options = new ListBlobsOptions()
+final ListBlobsOptions options = new ListBlobsOptions()
 .setPrefix(prefix);
 
-for (BlobItem blob : containerClient.listBlobs(options, null)) {
-BlobItemProperties properties = blob.getProperties();
-
-Builder builder = new Builder()
-.containerName(containerName)
-.blobName(blob.getName())
-.primaryUri(String.format("%s/%s", 
containerClient.getBlobContainerUrl(), blob.getName()))
-.etag(properties.getETag())
-.blobType(properties.getBlobType().toString())
-.contentType(properties.getContentType())
-.contentLanguage(properties.getContentLanguage())
-
.lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
-.length(properties.getContentLength());
-
-listing.add(builder.build());
-}
+final Iterator> result = 
containerClient.listBlobs(options, null).iterableByPage().iterator();
+String continuationToken;
+
+do {
+final PagedResponse pagedResult = result.next();
+continuationToken = pagedResult.getContinuationToken();

Review Comment:
   It seems to me that the items can be processed without tracking the 
continuation token and using a simple iterator (instead of the paged iterator).
   Based on this documentation:
   https://docs.microsoft.com/en-us/azure/developer/java/sdk/pagination
   
   > Make it possible to easily iterate over each element in the collection 
individually, ignoring any need for manual pagination or tracking of 
continuation tokens.
   
   > Regardless of whether you iterate by page or by each item, there's no 
difference in performance or the number of calls made to the service.
   
   It is applicable to the v12 Blob and the ADLS processors as they use the new 
SDK.
   So I think only the timestamp filtering needs to be added here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a diff in pull request #5916: NIFI-9846 Implement pagination listing for Azure List processors

2022-04-04 Thread GitBox


turcsanyip commented on code in PR #5916:
URL: https://github.com/apache/nifi/pull/5916#discussion_r841463319


##
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java:
##
@@ -183,41 +185,53 @@ protected String getDefaultTimePrecision() {
 final String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
 final String prefix = 
Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
 final List listing = new ArrayList<>();
+final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
 try {
 final CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
 final CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
 
 final OperationContext operationContext = new OperationContext();
 AzureStorageUtils.setProxy(operationContext, context);
 
-for (final ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-if (blob instanceof CloudBlob) {
-final CloudBlob cloudBlob = (CloudBlob) blob;
-final BlobProperties properties = 
cloudBlob.getProperties();
-final StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
-
-final Builder builder = new BlobInfo.Builder()
-  
.primaryUri(uri.getPrimaryUri().toString())
-  .blobName(cloudBlob.getName())
-  .containerName(containerName)
-  
.contentType(properties.getContentType())
-  
.contentLanguage(properties.getContentLanguage())
-  .etag(properties.getEtag())
-  
.lastModifiedTime(properties.getLastModified().getTime())
-  .length(properties.getLength());
-
-if (uri.getSecondaryUri() != null) {
-builder.secondaryUri(uri.getSecondaryUri().toString());
-}
-
-if (blob instanceof CloudBlockBlob) {
-builder.blobType(AzureStorageUtils.BLOCK);
-} else {
-builder.blobType(AzureStorageUtils.PAGE);
+ResultContinuation continuationToken = null;
+
+do {
+final ResultSegment result = 
container.listBlobsSegmented(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, 
operationContext);
+continuationToken = result.getContinuationToken();
+
+for (final ListBlobItem blob : result.getResults()) {
+if (blob instanceof CloudBlob) {
+final CloudBlob cloudBlob = (CloudBlob) blob;
+final BlobProperties properties = 
cloudBlob.getProperties();
+
+if (properties.getLastModified().getTime() > 
minimumTimestamp) {

Review Comment:
   `>=` needs to be used instead because new files can be uploaded to Azure 
with the same timestamp (second precision on Azure) after the listing. 
`AbsractListProcessor` stores the already listed files with the previous max 
timesamp and can filter out those files later in the process.



##
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java:
##
@@ -199,34 +201,44 @@ protected boolean isListingResetNecessary(final 
PropertyDescriptor property) {
 }
 
 @Override
-protected List performListing(ProcessContext context, Long 
minTimestamp, ListingMode listingMode) throws IOException {
-String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+protected List performListing(final ProcessContext context, 
final Long minTimestamp, final ListingMode listingMode) throws IOException {
+final String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+final String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
 try {
-List listing = new ArrayList<>();
+final List listing = new ArrayList<>();
 
-BlobContainerClient containerClient =