Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
exceptionfactory merged PR #10874: URL: https://github.com/apache/nifi/pull/10874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on PR #10874: URL: https://github.com/apache/nifi/pull/10874#issuecomment-4057945835 Thanks for the feedback @exceptionfactory . Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933633470
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
}
}
+@Test
+public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+final long maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
Review Comment:
We read several properties from nifi.properties, including max appendable
claim size, repo location, archive enabled, etc. So we want to get this from
properties, not just declare a constant. But we can move it into the
@BeforeEach so that we just define it once instead of adding the parsing code
to several methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933628688
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -99,9 +101,10 @@ public class FileSystemRepository implements
ContentRepository {
private final List containerNames;
private final AtomicLong index;
-private final ScheduledExecutorService executor = new FlowEngine(4,
"FileSystemRepository Workers", true);
+private final ScheduledExecutorService executor = new FlowEngine(6,
"FileSystemRepository Workers", true);
Review Comment:
Thanks for confirming.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933627645
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+//
=
+// Truncation Feature: Helpers
+//
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+when(queue.getIdentifier()).thenReturn("1234");
+when(connection.getFlowFileQueue()).thenReturn(queue);
+queueProvider.addConnection(connection);
+return new RuntimeRepoContext(claimManager, queueProvider, queue);
+}
+
+private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+claim.setLength(length);
+if (truncationCandidate) {
+claim.setTruncationCandidate(true);
+}
+return claim;
+}
+
+private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+.id(1L)
+.addAttribute("uuid", UUID.randomUUID().toString())
+.contentClaim(claim)
+.build();
+
+final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+createRecord.setWorking(flowFile, false);
+createRecord.setDestination(queue);
+repo.updateRepository(List.of(createRecord));
+
+final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+deleteRecord.markForDelete();
+repo.updateRepository(List.of(deleteRecord));
+}
+
+/**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+private List writeAndRecover(final ContentClaim... claims)
throws IOException {
+final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+final Connection writeConnection = Mockito.mock(Connection.class);
+when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 1, "0 sec", 0L, "0 B");
+when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+writeQueueProvider.addConnection(writeConnection);
+
+try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+repo.initialize(writeClaimManager);
+repo.loadFlowFiles(writeQueueProvider);
+
+final List records = new ArrayList<>();
+for (int i = 0; i < claims.length; i++) {
+final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+.id(i + 1L)
+.addAttribute("uuid", "----" +
String.format("%012d", i + 1))
+.contentClaim(claims[i])
+.build();
+final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+rec.setWorking(ff, false);
+rec.setDestination(writeQueue);
+records.add(rec);
+}
+repo.updateRepository(records);
+}
+
+
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933623258
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties
nifiProperties) {
retainOrphanedFlowFiles = orphanedFlowFileProperty == null ||
Boolean.parseBoolean(orphanedFlowFileProperty);
this.maxCharactersToCache =
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+final long maxAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
Review Comment:
Is just a "magic number" ... if someone goes in and sets Max Appendable to
something large like 10 MB, we still want to consider truncating smaller
values. 1 MB seems reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933440725
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -99,9 +101,10 @@ public class FileSystemRepository implements
ContentRepository {
private final List containerNames;
private final AtomicLong index;
-private final ScheduledExecutorService executor = new FlowEngine(4,
"FileSystemRepository Workers", true);
+private final ScheduledExecutorService executor = new FlowEngine(6,
"FileSystemRepository Workers", true);
Review Comment:
Yes, just adding additional tasks to it and want to avoid starvation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933436997
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##
@@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider
queueProvider) throws IOException
continue;
} else if (claim != null) {
+// If the claim exceeds the max appendable claim length on its
own and doesn't start the Resource Claim,
+// we will consider it to be eligible for truncation. However,
if there are multiple FlowFiles sharing the
+// same claim, we cannot truncate it because doing so would
affect the other FlowFiles.
+if (claim.getOffset() > 0 && claim.getLength() >
truncationThreshold && claim instanceof final StandardContentClaim scc) {
Review Comment:
It should never be anything other than `StandardContentClaim`. That that is
purely "defensive" because we need to cast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933429177
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+//
=
+// Truncation Feature: Helpers
+//
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+when(queue.getIdentifier()).thenReturn("1234");
+when(connection.getFlowFileQueue()).thenReturn(queue);
+queueProvider.addConnection(connection);
+return new RuntimeRepoContext(claimManager, queueProvider, queue);
+}
+
+private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+claim.setLength(length);
+if (truncationCandidate) {
+claim.setTruncationCandidate(true);
+}
+return claim;
+}
+
+private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+.id(1L)
+.addAttribute("uuid", UUID.randomUUID().toString())
+.contentClaim(claim)
+.build();
+
+final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+createRecord.setWorking(flowFile, false);
+createRecord.setDestination(queue);
+repo.updateRepository(List.of(createRecord));
+
+final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+deleteRecord.markForDelete();
+repo.updateRepository(List.of(deleteRecord));
+}
+
+/**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+private List writeAndRecover(final ContentClaim... claims)
throws IOException {
+final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+final Connection writeConnection = Mockito.mock(Connection.class);
+when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 1, "0 sec", 0L, "0 B");
+when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+writeQueueProvider.addConnection(writeConnection);
+
+try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+repo.initialize(writeClaimManager);
+repo.loadFlowFiles(writeQueueProvider);
+
+final List records = new ArrayList<>();
+for (int i = 0; i < claims.length; i++) {
+final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+.id(i + 1L)
+.addAttribute("uuid", "----" +
String.format("%012d", i + 1))
+.contentClaim(claims[i])
+.build();
+final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+rec.setWorking(ff, false);
+rec.setDestination(writeQueue);
+records.add(rec);
+}
+repo.updateRepository(records);
+}
+
+//
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933417283
##
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
private static final Logger logger =
LoggerFactory.getLogger(StandardResourceClaimManager.class);
private final ConcurrentMap claimantCounts =
new ConcurrentHashMap<>();
private final BlockingQueue destructableClaims = new
LinkedBlockingQueue<>(5);
+private final BlockingQueue truncatableClaims = new
LinkedBlockingQueue<>(10);
Review Comment:
No, no particular reason. Just wanted a big value that's small enough to not
cause heap exhaustion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2931926951
##
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##
@@ -161,6 +162,30 @@ public void markDestructable(final ResourceClaim claim) {
}
}
+@Override
+public void markTruncatable(final ContentClaim contentClaim) {
+if (contentClaim == null) {
+return;
+}
+
+final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+synchronized (resourceClaim) {
+if (isDestructable(resourceClaim)) {
+return;
+}
+
+logger.debug("Marking {} as truncatable", contentClaim);
+try {
+if (!truncatableClaims.offer(contentClaim, 1,
TimeUnit.MINUTES)) {
+logger.debug("Unable to mark {} as truncatable because the
queue is full.", contentClaim);
Review Comment:
It seems like this would be better as an `INFO` level, and it would be
useful to include the queue size.
```suggestion
logger.info("Unable to mark {} as truncatable because
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
```
##
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
+.name("Batch Count")
+.description("The maximum number of batches to generate. Each batch
produces 10 FlowFiles (9 small + 1 large). "
+ + "Once this many batches have been generated, no more
FlowFiles will be produced until the processor is stopped and restarted.")
Review Comment:
Recommend using a multi-line string
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+//
=
+// Truncation Feature: Helpers
+//
=
+
+/**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+}
+
+private RuntimeRepoContext createRuntimeRepoContext() {
+final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+final TestQueueProvider queueProvider = new TestQueueProvider();
+final Connection connection = Mockito.mock(Connection.class);
+when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2783113669
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -1035,6 +1050,120 @@ public void purge() {
resourceClaimManager.purge();
}
+
+private class TruncateClaims implements Runnable {
+
+@Override
+public void run() {
+final Map truncationActivationCache = new
HashMap<>();
+
+// Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+for (final String container : containerNames) {
+if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+final List toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+if (toTruncate.isEmpty()) {
+continue;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+// Drain any Truncation Claims from the Resource Claim Manager.
+// If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+// This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+// but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+while (true) {
+final List toTruncate = new ArrayList<>();
+resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+if (toTruncate.isEmpty()) {
+return;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+private void truncateClaims(final List toTruncate, final
Map truncationActivationCache) {
+final Map> claimsSkipped = new
HashMap<>();
+
+for (final ContentClaim claim : toTruncate) {
+final String container =
claim.getResourceClaim().getContainer();
+if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+continue;
+}
+
+if (claim.isTruncationCandidate()) {
+truncate(claim);
+}
+}
+
+claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+}
+
+private boolean isTruncationActiveForContainer(final String container,
final Map activationCache) {
+// If not archiving data, we consider truncation always active.
+if (!archiveData) {
+return true;
+}
+
+final Boolean cachedValue = activationCache.get(container);
+if (cachedValue != null) {
+return cachedValue;
+}
+
+if (!isArchiveClearedOnLastRun(container)) {
+LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
+activationCache.put(container, false);
+return false;
+}
+
+final long usableSpace;
+try {
+usableSpace = getContainerUsableSpace(container);
+} catch (final IOException ioe) {
+LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container.", container, ioe);
+return false;
+}
+
+final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+if (minUsableSpace != null && usableSpace < minUsableSpace) {
+LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+container, usableSpace, minUsableSpace);
+
+activationCache.put(container, true);
+return true;
+}
+
+activationCache.put(container, false);
+return false;
+}
+
+private void truncate(final ContentClaim claim) {
Review Comment:
Thanks for reviewing @pvillard31!
In short, no, that should not be possible. The only way we will ever queue
up the `ContentClaim` for truncation is if the FlowFile Repository is synched
to disk (typically on checkpoint but also possible on every commit if fsync
property in `nifi.properties` is set to `true`) an
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
pvillard31 commented on PR #10874: URL: https://github.com/apache/nifi/pull/10874#issuecomment-3870495672 As a side note, the integration test failure was caused by another commit and is now fixed if you rebase on main. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] NIFI-15570: Keep track of Content Claims where the last Claim in a Re… [nifi]
pvillard31 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2781573813
##
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##
@@ -1035,6 +1050,120 @@ public void purge() {
resourceClaimManager.purge();
}
+
+private class TruncateClaims implements Runnable {
+
+@Override
+public void run() {
+final Map truncationActivationCache = new
HashMap<>();
+
+// Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+for (final String container : containerNames) {
+if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+final List toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+if (toTruncate.isEmpty()) {
+continue;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+// Drain any Truncation Claims from the Resource Claim Manager.
+// If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+// This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+// but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+while (true) {
+final List toTruncate = new ArrayList<>();
+resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+if (toTruncate.isEmpty()) {
+return;
+}
+
+truncateClaims(toTruncate, truncationActivationCache);
+}
+}
+
+private void truncateClaims(final List toTruncate, final
Map truncationActivationCache) {
+final Map> claimsSkipped = new
HashMap<>();
+
+for (final ContentClaim claim : toTruncate) {
+final String container =
claim.getResourceClaim().getContainer();
+if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+continue;
+}
+
+if (claim.isTruncationCandidate()) {
+truncate(claim);
+}
+}
+
+claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+}
+
+private boolean isTruncationActiveForContainer(final String container,
final Map activationCache) {
+// If not archiving data, we consider truncation always active.
+if (!archiveData) {
+return true;
+}
+
+final Boolean cachedValue = activationCache.get(container);
+if (cachedValue != null) {
+return cachedValue;
+}
+
+if (!isArchiveClearedOnLastRun(container)) {
+LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
+activationCache.put(container, false);
+return false;
+}
+
+final long usableSpace;
+try {
+usableSpace = getContainerUsableSpace(container);
+} catch (final IOException ioe) {
+LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container.", container, ioe);
+return false;
+}
+
+final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+if (minUsableSpace != null && usableSpace < minUsableSpace) {
+LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+container, usableSpace, minUsableSpace);
+
+activationCache.put(container, true);
+return true;
+}
+
+activationCache.put(container, false);
+return false;
+}
+
+private void truncate(final ContentClaim claim) {
Review Comment:
The truncate method doesn't verify that the claimant count is still 0 before
truncating. If a clone operation increments the claimant count while the
truncation task is mid-flight, we could truncate content that is still
referenced. Isn't it a concern?
Wondering if we could have a race condition:
1. `Tr
