Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-16 Thread via GitHub


exceptionfactory merged PR #10874:
URL: https://github.com/apache/nifi/pull/10874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on PR #10874:
URL: https://github.com/apache/nifi/pull/10874#issuecomment-4057945835

   Thanks for the feedback @exceptionfactory . Updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933633470


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
 }
 }
 
+@Test
+public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+final long maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();

Review Comment:
   We read several properties from nifi.properties, including max appendable 
claim size, repo location, archive enabled, etc. So we want to get this from 
properties, not just declare a constant. But we can move it into the 
@BeforeEach so that we just define it once instead of adding the parsing code 
to several methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933628688


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -99,9 +101,10 @@ public class FileSystemRepository implements 
ContentRepository {
 private final List containerNames;
 private final AtomicLong index;
 
-private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
+private final ScheduledExecutorService executor = new FlowEngine(6, 
"FileSystemRepository Workers", true);

Review Comment:
   Thanks for confirming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933627645


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
 return swapLocation;
 }
 }
+
+// 
=
+// Truncation Feature: Helpers
+// 
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+when(queue.getIdentifier()).thenReturn("1234");
+when(connection.getFlowFileQueue()).thenReturn(queue);
+queueProvider.addConnection(connection);
+return new RuntimeRepoContext(claimManager, queueProvider, queue);
+}
+
+private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+claim.setLength(length);
+if (truncationCandidate) {
+claim.setTruncationCandidate(true);
+}
+return claim;
+}
+
+private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws 
IOException {
+final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+.id(1L)
+.addAttribute("uuid", UUID.randomUUID().toString())
+.contentClaim(claim)
+.build();
+
+final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+createRecord.setWorking(flowFile, false);
+createRecord.setDestination(queue);
+repo.updateRepository(List.of(createRecord));
+
+final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+deleteRecord.markForDelete();
+repo.updateRepository(List.of(deleteRecord));
+}
+
+/**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+private List writeAndRecover(final ContentClaim... claims) 
throws IOException {
+final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+final Connection writeConnection = Mockito.mock(Connection.class);
+when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 1, "0 sec", 0L, "0 B");
+when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+writeQueueProvider.addConnection(writeConnection);
+
+try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+repo.initialize(writeClaimManager);
+repo.loadFlowFiles(writeQueueProvider);
+
+final List records = new ArrayList<>();
+for (int i = 0; i < claims.length; i++) {
+final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+.id(i + 1L)
+.addAttribute("uuid", "----" + 
String.format("%012d", i + 1))
+.contentClaim(claims[i])
+.build();
+final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+rec.setWorking(ff, false);
+rec.setDestination(writeQueue);
+records.add(rec);
+}
+repo.updateRepository(records);
+}
+
+   

Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933623258


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties 
nifiProperties) {
 retainOrphanedFlowFiles = orphanedFlowFileProperty == null || 
Boolean.parseBoolean(orphanedFlowFileProperty);
 
 this.maxCharactersToCache = 
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+final long maxAppendableClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);

Review Comment:
   Is just a "magic number" ... if someone goes in and sets Max Appendable to 
something large like 10 MB, we still want to consider truncating smaller 
values. 1 MB seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933440725


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -99,9 +101,10 @@ public class FileSystemRepository implements 
ContentRepository {
 private final List containerNames;
 private final AtomicLong index;
 
-private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
+private final ScheduledExecutorService executor = new FlowEngine(6, 
"FileSystemRepository Workers", true);

Review Comment:
   Yes, just adding additional tasks to it and want to avoid starvation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933436997


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##
@@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider 
queueProvider) throws IOException
 
 continue;
 } else if (claim != null) {
+// If the claim exceeds the max appendable claim length on its 
own and doesn't start the Resource Claim,
+// we will consider it to be eligible for truncation. However, 
if there are multiple FlowFiles sharing the
+// same claim, we cannot truncate it because doing so would 
affect the other FlowFiles.
+if (claim.getOffset() > 0 && claim.getLength() > 
truncationThreshold && claim instanceof final StandardContentClaim scc) {

Review Comment:
   It should never be anything other than `StandardContentClaim`. That that is 
purely "defensive" because we need to cast.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933429177


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
 return swapLocation;
 }
 }
+
+// 
=
+// Truncation Feature: Helpers
+// 
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+when(queue.getIdentifier()).thenReturn("1234");
+when(connection.getFlowFileQueue()).thenReturn(queue);
+queueProvider.addConnection(connection);
+return new RuntimeRepoContext(claimManager, queueProvider, queue);
+}
+
+private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+claim.setLength(length);
+if (truncationCandidate) {
+claim.setTruncationCandidate(true);
+}
+return claim;
+}
+
+private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws 
IOException {
+final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+.id(1L)
+.addAttribute("uuid", UUID.randomUUID().toString())
+.contentClaim(claim)
+.build();
+
+final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+createRecord.setWorking(flowFile, false);
+createRecord.setDestination(queue);
+repo.updateRepository(List.of(createRecord));
+
+final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+deleteRecord.markForDelete();
+repo.updateRepository(List.of(deleteRecord));
+}
+
+/**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+private List writeAndRecover(final ContentClaim... claims) 
throws IOException {
+final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+final Connection writeConnection = Mockito.mock(Connection.class);
+when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 1, "0 sec", 0L, "0 B");
+when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+writeQueueProvider.addConnection(writeConnection);
+
+try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+repo.initialize(writeClaimManager);
+repo.loadFlowFiles(writeQueueProvider);
+
+final List records = new ArrayList<>();
+for (int i = 0; i < claims.length; i++) {
+final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+.id(i + 1L)
+.addAttribute("uuid", "----" + 
String.format("%012d", i + 1))
+.contentClaim(claims[i])
+.build();
+final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+rec.setWorking(ff, false);
+rec.setDestination(writeQueue);
+records.add(rec);
+}
+repo.updateRepository(records);
+}
+
+// 

Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933417283


##
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
 private static final Logger logger = 
LoggerFactory.getLogger(StandardResourceClaimManager.class);
 private final ConcurrentMap claimantCounts = 
new ConcurrentHashMap<>();
 private final BlockingQueue destructableClaims = new 
LinkedBlockingQueue<>(5);
+private final BlockingQueue truncatableClaims = new 
LinkedBlockingQueue<>(10);

Review Comment:
   No, no particular reason. Just wanted a big value that's small enough to not 
cause heap exhaustion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-03-13 Thread via GitHub


exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2931926951


##
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##
@@ -161,6 +162,30 @@ public void markDestructable(final ResourceClaim claim) {
 }
 }
 
+@Override
+public void markTruncatable(final ContentClaim contentClaim) {
+if (contentClaim == null) {
+return;
+}
+
+final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+synchronized (resourceClaim) {
+if (isDestructable(resourceClaim)) {
+return;
+}
+
+logger.debug("Marking {} as truncatable", contentClaim);
+try {
+if (!truncatableClaims.offer(contentClaim, 1, 
TimeUnit.MINUTES)) {
+logger.debug("Unable to mark {} as truncatable because the 
queue is full.", contentClaim);

Review Comment:
   It seems like this would be better as an `INFO` level, and it would be 
useful to include the queue size.
   ```suggestion
   logger.info("Unable to mark {} as truncatable because 
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
   ```



##
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+static final PropertyDescriptor BATCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("Batch Count")
+.description("The maximum number of batches to generate. Each batch 
produces 10 FlowFiles (9 small + 1 large). "
+ + "Once this many batches have been generated, no more 
FlowFiles will be produced until the processor is stopped and restarted.")

Review Comment:
   Recommend using a multi-line string



##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
 return swapLocation;
 }
 }
+
+// 
=
+// Truncation Feature: Helpers
+// 
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+  

Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-02-09 Thread via GitHub


markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2783113669


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -1035,6 +1050,120 @@ public void purge() {
 resourceClaimManager.purge();
 }
 
+
+private class TruncateClaims implements Runnable {
+
+@Override
+public void run() {
+final Map truncationActivationCache = new 
HashMap<>();
+
+// Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+for (final String container : containerNames) {
+if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+final List toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+if (toTruncate.isEmpty()) {
+continue;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+// Drain any Truncation Claims from the Resource Claim Manager.
+// If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+// This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+// but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+while (true) {
+final List toTruncate = new ArrayList<>();
+resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+if (toTruncate.isEmpty()) {
+return;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+private void truncateClaims(final List toTruncate, final 
Map truncationActivationCache) {
+final Map> claimsSkipped = new 
HashMap<>();
+
+for (final ContentClaim claim : toTruncate) {
+final String container = 
claim.getResourceClaim().getContainer();
+if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation.", claim, container);
+claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+continue;
+}
+
+if (claim.isTruncationCandidate()) {
+truncate(claim);
+}
+}
+
+claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+}
+
+private boolean isTruncationActiveForContainer(final String container, 
final Map activationCache) {
+// If not archiving data, we consider truncation always active.
+if (!archiveData) {
+return true;
+}
+
+final Boolean cachedValue = activationCache.get(container);
+if (cachedValue != null) {
+return cachedValue;
+}
+
+if (!isArchiveClearedOnLastRun(container)) {
+LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run.", container);
+activationCache.put(container, false);
+return false;
+}
+
+final long usableSpace;
+try {
+usableSpace = getContainerUsableSpace(container);
+} catch (final IOException ioe) {
+LOG.warn("Failed to determine usable space for container {}. 
Will not truncate claims for this container.", container, ioe);
+return false;
+}
+
+final Long minUsableSpace = 
minUsableContainerBytesForArchive.get(container);
+if (minUsableSpace != null && usableSpace < minUsableSpace) {
+LOG.debug("Truncate is active for Container {} because usable 
space of {} bytes is below the desired threshold of {} bytes.",
+container, usableSpace, minUsableSpace);
+
+activationCache.put(container, true);
+return true;
+}
+
+activationCache.put(container, false);
+return false;
+}
+
+private void truncate(final ContentClaim claim) {

Review Comment:
   Thanks for reviewing @pvillard31!
   In short, no, that should not be possible. The only way we will ever queue 
up the `ContentClaim` for truncation is if the FlowFile Repository is synched 
to disk (typically on checkpoint but also possible on every commit if fsync 
property in `nifi.properties` is set to `true`) an

Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-02-09 Thread via GitHub


pvillard31 commented on PR #10874:
URL: https://github.com/apache/nifi/pull/10874#issuecomment-3870495672

   As a side note, the integration test failure was caused by another commit 
and is now fixed if you rebase on main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]

2026-02-09 Thread via GitHub


pvillard31 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2781573813


##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -1035,6 +1050,120 @@ public void purge() {
 resourceClaimManager.purge();
 }
 
+
+private class TruncateClaims implements Runnable {
+
+@Override
+public void run() {
+final Map truncationActivationCache = new 
HashMap<>();
+
+// Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+for (final String container : containerNames) {
+if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+final List toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+if (toTruncate.isEmpty()) {
+continue;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+// Drain any Truncation Claims from the Resource Claim Manager.
+// If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+// This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+// but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+while (true) {
+final List toTruncate = new ArrayList<>();
+resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+if (toTruncate.isEmpty()) {
+return;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+private void truncateClaims(final List toTruncate, final 
Map truncationActivationCache) {
+final Map> claimsSkipped = new 
HashMap<>();
+
+for (final ContentClaim claim : toTruncate) {
+final String container = 
claim.getResourceClaim().getContainer();
+if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation.", claim, container);
+claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+continue;
+}
+
+if (claim.isTruncationCandidate()) {
+truncate(claim);
+}
+}
+
+claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+}
+
+private boolean isTruncationActiveForContainer(final String container, 
final Map activationCache) {
+// If not archiving data, we consider truncation always active.
+if (!archiveData) {
+return true;
+}
+
+final Boolean cachedValue = activationCache.get(container);
+if (cachedValue != null) {
+return cachedValue;
+}
+
+if (!isArchiveClearedOnLastRun(container)) {
+LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run.", container);
+activationCache.put(container, false);
+return false;
+}
+
+final long usableSpace;
+try {
+usableSpace = getContainerUsableSpace(container);
+} catch (final IOException ioe) {
+LOG.warn("Failed to determine usable space for container {}. 
Will not truncate claims for this container.", container, ioe);
+return false;
+}
+
+final Long minUsableSpace = 
minUsableContainerBytesForArchive.get(container);
+if (minUsableSpace != null && usableSpace < minUsableSpace) {
+LOG.debug("Truncate is active for Container {} because usable 
space of {} bytes is below the desired threshold of {} bytes.",
+container, usableSpace, minUsableSpace);
+
+activationCache.put(container, true);
+return true;
+}
+
+activationCache.put(container, false);
+return false;
+}
+
+private void truncate(final ContentClaim claim) {

Review Comment:
   The truncate method doesn't verify that the claimant count is still 0 before 
truncating. If a clone operation increments the claimant count while the 
truncation task is mid-flight, we could truncate content that is still 
referenced. Isn't it a concern?
   
   Wondering if we could have a race condition:
   1. `Tr