This is an automated email from the ASF dual-hosted git repository.

mreutegg pushed a commit to branch OAK-11807
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 572c21196f5ee13673864eea9e2acd418af519c5
Author: Marcel Reutegger <marcel.reuteg...@gmail.com>
AuthorDate: Mon Jul 14 18:02:02 2025 +0200

    OAK-11807: Duplicate header after failed SegmentBufferWriter.flush()
    
    Do not modify the instance field length when the total length including the 
header is calculated.
---
 .../oak/segment/SegmentBufferWriter.java           |  12 +-
 .../jackrabbit/oak/segment/FailedFlushTest.java    | 173 +++++++++++++++++++++
 2 files changed, 179 insertions(+), 6 deletions(-)

diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
index 2c57c7d158..a3717ef6b0 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
@@ -327,22 +327,22 @@ public class SegmentBufferWriter implements 
WriteOperationHandler {
                         segment.getSegmentId(), referencedSegmentIdCount, 
recordNumberCount, length, totalLength));
             }
 
-            statistics.size = length = totalLength;
+            statistics.size = totalLength;
 
             int pos = HEADER_SIZE;
-            if (pos + length <= buffer.length) {
+            if (pos + totalLength <= buffer.length) {
                 // the whole segment fits to the space *after* the referenced
                 // segment identifiers we've already written, so we can safely
                 // copy those bits ahead even if concurrent code is still
                 // reading from that part of the buffer
-                arraycopy(buffer, 0, buffer, buffer.length - length, pos);
-                pos += buffer.length - length;
+                arraycopy(buffer, 0, buffer, buffer.length - totalLength, pos);
+                pos += buffer.length - totalLength;
             } else {
                 // this might leave some empty space between the header and
                 // the record data, but this case only occurs when the
                 // segment is >252kB in size and the maximum overhead is <<4kB,
                 // which is acceptable
-                length = buffer.length;
+                totalLength = buffer.length;
             }
 
             for (SegmentId segmentId : segmentReferences) {
@@ -358,7 +358,7 @@ public class SegmentBufferWriter implements 
WriteOperationHandler {
 
             SegmentId segmentId = segment.getSegmentId();
             LOG.debug("Writing data segment: {} ", statistics);
-            store.writeSegment(segmentId, buffer, buffer.length - length, 
length);
+            store.writeSegment(segmentId, buffer, buffer.length - totalLength, 
totalLength);
             newSegment(store);
         }
     }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/FailedFlushTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/FailedFlushTest.java
new file mode 100644
index 0000000000..805dd5dc89
--- /dev/null
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/FailedFlushTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.tar.SegmentTarManager;
+import org.apache.jackrabbit.oak.segment.file.tar.SegmentTarWriter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FailedFlushTest {
+
+    private DefaultSegmentWriter writer;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private final RandomStringUtils randomStrings = 
RandomStringUtils.insecure();
+    private FileStore store;
+    private boolean failAfterSegmentWrite = false;
+    private boolean failBeforeSegmentWrite = false;
+    private final Map<UUID, Set<Integer>> segmentId2Size = new HashMap<>();
+
+    @Before
+    public void setUp() throws Exception {
+        store = createFileStore();
+        writer = defaultSegmentWriterBuilder("test").build(store);
+    }
+
+    @Test
+    public void repeatedFlushFailure() throws Exception {
+        for (int i = 0; i < 1000; i++) {
+            writer.writeString(randomStrings.nextAlphanumeric(16));
+        }
+
+        failBeforeSegmentWrite = true;
+        for (int i = 0; i < 100; i++) {
+            try {
+                writer.flush();
+                fail("This flush must fail");
+            } catch (IOException e) {
+                // expected
+            }
+        }
+        failBeforeSegmentWrite = false;
+
+        // must succeed now
+        writer.flush();
+    }
+
+    @Test
+    public void flushTwiceAfterSegmentStored() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            writer.writeString(randomStrings.nextAlphanumeric(16));
+        }
+
+        failAfterSegmentWrite = true;
+        try {
+            writer.flush();
+            fail("This flush must fail");
+        } catch (IOException e) {
+            // expected
+        }
+        failAfterSegmentWrite = false;
+        writer.flush();
+
+        // expect two segments:
+        // - first segment written by FileStore with initial node
+        // - second segment written by this test
+        // TarPersistence counts duplicate segments as one segment
+        assertEquals(2, store.getSegmentCount());
+        for (Map.Entry<UUID, Set<Integer>> entry : segmentId2Size.entrySet()) {
+            UUID segmentId = entry.getKey();
+            Set<Integer> sizes = entry.getValue();
+            assertEquals("Same segment (" + segmentId + ") with different 
sizes: " + sizes, 1, sizes.size());
+        }
+    }
+
+    private FileStore createFileStore() throws Exception {
+        File dir = folder.newFolder("segment-store");
+        return 
FileStoreBuilder.fileStoreBuilder(dir).withCustomPersistence(new 
TarPersistence(dir) {
+            @Override
+            public SegmentArchiveManager createArchiveManager(boolean 
memoryMapping, boolean offHeapAccess,
+                                                              IOMonitor 
ioMonitor, FileStoreMonitor fileStoreMonitor,
+                                                              
RemoteStoreMonitor remoteStoreMonitor) {
+                return new TestArchiveManager(dir, fileStoreMonitor, 
ioMonitor, memoryMapping, offHeapAccess);
+            }
+        }).build();
+    }
+
+    private class TestArchiveManager extends SegmentTarManager {
+
+        private final File segmentStoreDir;
+        private final FileStoreMonitor fileStoreMonitor;
+        private final IOMonitor ioMonitor;
+
+        TestArchiveManager(File segmentStoreDir, FileStoreMonitor 
fileStoreMonitor, IOMonitor ioMonitor,
+                           boolean memoryMapping, boolean offHeapAccess) {
+            super(segmentStoreDir, fileStoreMonitor, ioMonitor, memoryMapping, 
offHeapAccess);
+            this.segmentStoreDir = segmentStoreDir;
+            this.fileStoreMonitor = fileStoreMonitor;
+            this.ioMonitor = ioMonitor;
+        }
+
+        @Override
+        public @NotNull SegmentArchiveWriter create(String archiveName) {
+            return new TestArchiveWriter(new File(segmentStoreDir, 
archiveName), fileStoreMonitor, ioMonitor);
+        }
+    }
+
+    private class TestArchiveWriter extends SegmentTarWriter {
+        TestArchiveWriter(File file, FileStoreMonitor monitor, IOMonitor 
ioMonitor) {
+            super(file, monitor, ioMonitor);
+        }
+
+        @Override
+        public void writeSegment(long msb, long lsb, byte[] data, int offset, 
int size,
+                                 int generation, int fullGeneration, boolean 
compacted) throws IOException {
+            if (failBeforeSegmentWrite) {
+                throw new IOException("Simulated failure before segment 
write");
+            }
+            super.writeSegment(msb, lsb, data, offset, size, generation, 
fullGeneration, compacted);
+            Buffer segmentData = Buffer.wrap(data, offset, size);
+            segmentId2Size.computeIfAbsent(new UUID(msb, lsb), uuid -> new 
HashSet<>()).add(size);
+            if (store != null) {
+                SegmentId id = new SegmentId(store, msb, lsb);
+                Segment s = new Segment(store.getSegmentIdProvider(), id, 
segmentData);
+                System.out.println(s);
+            }
+            if (failAfterSegmentWrite) {
+                throw new IOException("Simulated failure after segment write");
+            }
+        }
+    }
+}

Reply via email to