turcsanyip commented on code in PR #5916:
URL: https://github.com/apache/nifi/pull/5916#discussion_r841463319
##
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java:
##
@@ -183,41 +185,53 @@ protected String getDefaultTimePrecision() {
final String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
final String prefix =
Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
final List listing = new ArrayList<>();
+final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
try {
final CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
final CloudBlobContainer container =
blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
-for (final ListBlobItem blob : container.listBlobs(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-if (blob instanceof CloudBlob) {
-final CloudBlob cloudBlob = (CloudBlob) blob;
-final BlobProperties properties =
cloudBlob.getProperties();
-final StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
-
-final Builder builder = new BlobInfo.Builder()
-
.primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
-
.contentType(properties.getContentType())
-
.contentLanguage(properties.getContentLanguage())
- .etag(properties.getEtag())
-
.lastModifiedTime(properties.getLastModified().getTime())
- .length(properties.getLength());
-
-if (uri.getSecondaryUri() != null) {
-builder.secondaryUri(uri.getSecondaryUri().toString());
-}
-
-if (blob instanceof CloudBlockBlob) {
-builder.blobType(AzureStorageUtils.BLOCK);
-} else {
-builder.blobType(AzureStorageUtils.PAGE);
+ResultContinuation continuationToken = null;
+
+do {
+final ResultSegment result =
container.listBlobsSegmented(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null,
operationContext);
+continuationToken = result.getContinuationToken();
+
+for (final ListBlobItem blob : result.getResults()) {
+if (blob instanceof CloudBlob) {
+final CloudBlob cloudBlob = (CloudBlob) blob;
+final BlobProperties properties =
cloudBlob.getProperties();
+
+if (properties.getLastModified().getTime() >
minimumTimestamp) {
Review Comment:
`>=` needs to be used instead because new files can be uploaded to Azure
with the same timestamp (second precision on Azure) after the listing.
`AbsractListProcessor` stores the already listed files with the previous max
timesamp and can filter out those files later in the process.
##
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java:
##
@@ -199,34 +201,44 @@ protected boolean isListingResetNecessary(final
PropertyDescriptor property) {
}
@Override
-protected List performListing(ProcessContext context, Long
minTimestamp, ListingMode listingMode) throws IOException {
-String containerName =
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-String prefix =
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+protected List performListing(final ProcessContext context,
final Long minTimestamp, final ListingMode listingMode) throws IOException {
+final String containerName =
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+final String prefix =
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
try {
-List listing = new ArrayList<>();
+final List listing = new ArrayList<>();
-BlobContainerClient containerClient =