josh-mckenzie commented on a change in pull request #1379:
URL: https://github.com/apache/cassandra/pull/1379#discussion_r785045772
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean
delete)
/**
* Delete the oldest hard-linked CDC commit log segment to free up space.
+ * @param bytesToFree, the minimum space to free up
* @return total deleted file size in bytes
*/
- public long deleteOldestLinkedCDCCommitLogSegment()
+ public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
{
+ if (bytesToFree <= 0)
+ return 0;
+
File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does
not exist.");
File[] files = cdcDir.tryList(f ->
CommitLogDescriptor.isValid(f.name()));
- Preconditions.checkState(files != null && files.length > 0,
- "There should be at least 1 CDC commit log
segment.");
+ if (files == null || files.length == 0)
+ {
+ logger.warn("Skip deleting due to no CDC commit log segments
found.");
+ return 0;
+ }
List<File> sorted = Arrays.stream(files)
-
.sorted(Comparator.comparingLong(File::lastModified))
+ // commit log file name (contains id)
increases monotonically
Review comment:
While true today, I'm a little concerned about this undocumented
(effectively; has a comment here but nothing to indicate dependency on other
side) coupling. Could we formalize or comment in the CommitLogSegment naming /
generation that we depend on that functionality here and {@link X} in the
JavaDoc to tie them together for future maintainers?
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean
delete)
/**
* Delete the oldest hard-linked CDC commit log segment to free up space.
+ * @param bytesToFree, the minimum space to free up
* @return total deleted file size in bytes
*/
- public long deleteOldestLinkedCDCCommitLogSegment()
+ public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
{
+ if (bytesToFree <= 0)
+ return 0;
+
File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does
not exist.");
File[] files = cdcDir.tryList(f ->
CommitLogDescriptor.isValid(f.name()));
- Preconditions.checkState(files != null && files.length > 0,
- "There should be at least 1 CDC commit log
segment.");
+ if (files == null || files.length == 0)
+ {
+ logger.warn("Skip deleting due to no CDC commit log segments
found.");
+ return 0;
+ }
List<File> sorted = Arrays.stream(files)
-
.sorted(Comparator.comparingLong(File::lastModified))
+ // commit log file name (contains id)
increases monotonically
+ .sorted(Comparator.comparing(File::name))
.collect(Collectors.toList());
- File oldestCdcFile = sorted.get(0);
- File cdcIndexFile =
CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
- return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+ long bytesDeleted = 0;
+ long bytesRemaining = 0;
+ boolean deletionCompleted = false;
+ // keep deleting from old to new until it reaches to the goal or the
current writting segment
Review comment:
nit: writing
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -158,6 +185,16 @@ public void shutdown()
return alloc;
}
+ // Non-blocking mode is just enabled for CDC. The segment is still marked
as FORBIDDEN.
Review comment:
nit: rephrase to "Non-blocking mode has just recently been enabled for
CDC" if that's the intent here. Reads like it's "only" set for CDC which is a
little confusing.
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean
delete)
/**
* Delete the oldest hard-linked CDC commit log segment to free up space.
+ * @param bytesToFree, the minimum space to free up
* @return total deleted file size in bytes
*/
- public long deleteOldestLinkedCDCCommitLogSegment()
+ public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
{
+ if (bytesToFree <= 0)
+ return 0;
+
File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does
not exist.");
File[] files = cdcDir.tryList(f ->
CommitLogDescriptor.isValid(f.name()));
- Preconditions.checkState(files != null && files.length > 0,
- "There should be at least 1 CDC commit log
segment.");
+ if (files == null || files.length == 0)
+ {
+ logger.warn("Skip deleting due to no CDC commit log segments
found.");
+ return 0;
+ }
List<File> sorted = Arrays.stream(files)
-
.sorted(Comparator.comparingLong(File::lastModified))
+ // commit log file name (contains id)
increases monotonically
+ .sorted(Comparator.comparing(File::name))
.collect(Collectors.toList());
- File oldestCdcFile = sorted.get(0);
- File cdcIndexFile =
CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
- return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+ long bytesDeleted = 0;
+ long bytesRemaining = 0;
+ boolean deletionCompleted = false;
+ // keep deleting from old to new until it reaches to the goal or the
current writting segment
+ for (File linkedCdcFile : sorted)
+ {
+ // only evaluate/update when deletionCompleted is false
+ deletionCompleted = deletionCompleted
+ || (bytesDeleted >= bytesToFree ||
linkedCdcFile.equals(allocatingFrom().getCDCFile()));
Review comment:
I found this conditional a little convoluted to parse. Maybe we do
something like:
```
// only evaluate/update when deletionCompleted is false
if (!deletionCompleted)
deletionCompleted = bytesDeleted > bytesToFree ||
linkedCdcFile.equals(allocatingFrom().getCDCFile());
```
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean
delete)
/**
* Delete the oldest hard-linked CDC commit log segment to free up space.
+ * @param bytesToFree, the minimum space to free up
* @return total deleted file size in bytes
Review comment:
I think we either need to amend this javadoc @return to match that we're
returning the bytes remaining or we need to juggle around variable names below.
Currently this seems incorrect.
It looks like we're expecting CDC bytes remaining in
CommitLogSegmentManagerCDC.processNewSegment, so I think this is probably just
a vestigial javadoc entry.
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -295,18 +334,23 @@ void processNewSegment(CommitLogSegment segment)
void processDiscardedSegment(CommitLogSegment segment)
{
- // See synchronization in CommitLogSegment.setCDCState
- synchronized(segment.cdcStateLock)
+ if (!segment.getCDCFile().exists())
+ {
+ logger.debug("Skip updating size. The CDC commit log segement
has been deleted already.");
Review comment:
Recommend revise this to something like "Not processing discarded
CommitLogSegment {}; this segment appears to have been deleted already.",
segment)
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
*/
void processNewSegment(CommitLogSegment segment)
{
- // See synchronization in CommitLogSegment.setCDCState
- synchronized(segment.cdcStateLock)
+ int segmentSize = defaultSegmentSize();
+ long allowance = allowableCDCBytes();
+ boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+ synchronized (segment.cdcStateLock)
{
- int segmentSize = defaultSegmentSize();
- long allowance = allowableCDCBytes();
- boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
segment.setCDCState(blocking && segmentSize +
sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
- // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
- while (!blocking && segmentSize + sizeInProgress.get() >
allowance)
- {
- long releasedSize =
segmentManager.deleteOldestLinkedCDCCommitLogSegment();
- sizeInProgress.getAndAdd(-releasedSize);
- logger.debug("Freed up {} bytes after deleting the oldest
CDC commit log segment in non-blocking mode. " +
- "Total on-disk CDC size: {}; allowed CDC
size: {}",
- releasedSize, sizeInProgress.get() +
segmentSize, allowance);
- }
-
// Aggresively count in the (estimated) size of new segments.
if (segment.getCDCState() == CDCState.PERMITTED)
- sizeInProgress.getAndAdd(segmentSize);
+ addSize(segmentSize);
+ }
+
+ // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
+ if (!blocking && sizeInProgress.get() > allowance)
+ {
+ long bytesToFree = sizeInProgress.get() - allowance;
+ long remaningSize =
segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
+ long releasedSize = sizeInProgress.get() - remaningSize;
+ sizeInProgress.getAndSet(remaningSize);
+ logger.debug("Freed up {} ({}) bytes after deleting the oldest
CDC commit log segments in non-blocking mode. " +
+ "Total on-disk CDC size: {}; allowed CDC size:
{}",
Review comment:
Should we be passing `remainingSize` as the string param for our "Total
on-disk CDC size:"? Passing bytesToFree will be listing the sizeInProgress -
allowance which _seems_ off?
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
*/
void processNewSegment(CommitLogSegment segment)
{
- // See synchronization in CommitLogSegment.setCDCState
- synchronized(segment.cdcStateLock)
+ int segmentSize = defaultSegmentSize();
+ long allowance = allowableCDCBytes();
+ boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+ synchronized (segment.cdcStateLock)
{
- int segmentSize = defaultSegmentSize();
- long allowance = allowableCDCBytes();
- boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
segment.setCDCState(blocking && segmentSize +
sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
- // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
- while (!blocking && segmentSize + sizeInProgress.get() >
allowance)
- {
- long releasedSize =
segmentManager.deleteOldestLinkedCDCCommitLogSegment();
- sizeInProgress.getAndAdd(-releasedSize);
- logger.debug("Freed up {} bytes after deleting the oldest
CDC commit log segment in non-blocking mode. " +
- "Total on-disk CDC size: {}; allowed CDC
size: {}",
- releasedSize, sizeInProgress.get() +
segmentSize, allowance);
- }
-
// Aggresively count in the (estimated) size of new segments.
Review comment:
nit: Spelling. Should be "Aggressively" (that's on me 😄 )
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
*/
void processNewSegment(CommitLogSegment segment)
{
- // See synchronization in CommitLogSegment.setCDCState
- synchronized(segment.cdcStateLock)
+ int segmentSize = defaultSegmentSize();
+ long allowance = allowableCDCBytes();
+ boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+ synchronized (segment.cdcStateLock)
{
- int segmentSize = defaultSegmentSize();
- long allowance = allowableCDCBytes();
- boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
segment.setCDCState(blocking && segmentSize +
sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
- // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
Review comment:
Keep this bread crumb so people know where the other side of this
synchronization is if they come back to modify later.
##########
File path:
src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
*/
void processNewSegment(CommitLogSegment segment)
{
- // See synchronization in CommitLogSegment.setCDCState
- synchronized(segment.cdcStateLock)
+ int segmentSize = defaultSegmentSize();
+ long allowance = allowableCDCBytes();
+ boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+ synchronized (segment.cdcStateLock)
{
- int segmentSize = defaultSegmentSize();
- long allowance = allowableCDCBytes();
- boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
segment.setCDCState(blocking && segmentSize +
sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
- // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
- while (!blocking && segmentSize + sizeInProgress.get() >
allowance)
- {
- long releasedSize =
segmentManager.deleteOldestLinkedCDCCommitLogSegment();
- sizeInProgress.getAndAdd(-releasedSize);
- logger.debug("Freed up {} bytes after deleting the oldest
CDC commit log segment in non-blocking mode. " +
- "Total on-disk CDC size: {}; allowed CDC
size: {}",
- releasedSize, sizeInProgress.get() +
segmentSize, allowance);
- }
-
// Aggresively count in the (estimated) size of new segments.
if (segment.getCDCState() == CDCState.PERMITTED)
- sizeInProgress.getAndAdd(segmentSize);
+ addSize(segmentSize);
+ }
+
+ // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
+ if (!blocking && sizeInProgress.get() > allowance)
+ {
+ long bytesToFree = sizeInProgress.get() - allowance;
+ long remaningSize =
segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
Review comment:
nit: spelling. `remainingSize`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]