Author: mduerig
Date: Wed Nov 25 16:28:33 2015
New Revision: 1716466

URL: http://svn.apache.org/viewvc?rev=1716466&view=rev
Log:
OAK-1828: Improved SegmentWriter
Encapsulate the state of the segment being written and its coordination wrt. 
concurrent access in a separate class

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBuilder.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdFactoryTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBuilder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBuilder.java?rev=1716466&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBuilder.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBuilder.java
 Wed Nov 25 16:28:33 2015
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.System.arraycopy;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.System.identityHashCode;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.VALUE;
+import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
+import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ID_BYTES;
+import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.SEGMENT_REFERENCE_LIMIT;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO document: not thread safe!
+ * TODO find a better name for SegmentBuilder
+ */
+class SegmentBuilder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentBuilder.class);
+
+    /**
+     * The set of root records (i.e. ones not referenced by other records)
+     * in this segment.
+     */
+    private final Map<RecordId, RecordType> roots = newLinkedHashMap();
+
+    /**
+     * Identifiers of the external blob references stored in this segment.
+     */
+    private final List<RecordId> blobrefs = newArrayList();
+
+    private final SegmentStore store;
+
+    private final SegmentTracker tracker;
+
+    /**
+     * Version of the segment storage format.
+     */
+    private final SegmentVersion version;
+
+    /**
+     * Id of this writer.
+     */
+    private final String wid;
+
+    /**
+     * The segment write buffer, filled from the end to the beginning
+     * (see OAK-629).
+     */
+    private byte[] buffer;
+
+    private Segment segment;
+
+    /**
+     * The number of bytes already written (or allocated). Counted from
+     * the <em>end</em> of the buffer.
+     */
+    private int length;
+
+    /**
+     * Current write position within the buffer. Grows up when raw data
+     * is written, but shifted downwards by the prepare methods.
+     */
+    private int position;
+
+    public SegmentBuilder(SegmentStore store, SegmentTracker tracker, 
SegmentVersion version, String wid) {
+        this.store = store;
+        this.tracker = tracker;
+        this.version = version;
+        this.wid = (wid == null
+                ? "w-" + identityHashCode(this)
+                : wid);
+
+        this.buffer = createNewBuffer(version);
+        newSegment(this.wid);
+    }
+
+    /**
+     * Allocate a new segment and write the segment meta data.
+     * The segment meta data is a string of the format {@code 
"{wid=W,sno=S,gc=G,t=T}"}
+     * where:
+     * <ul>
+     * <li>{@code W} is the writer id {@code wid}, </li>
+     * <li>{@code S} is a unique, increasing sequence number corresponding to 
the allocation order
+     * of the segments in this store, </li>
+     * <li>{@code G} is the garbage collection generation (i.e. the number of 
compaction cycles
+     * that have been run),</li>
+     * <li>{@code T} is a time stamp according to {@link 
System#currentTimeMillis()}.</li>
+     * </ul>
+     * The segment meta data is guaranteed to be the first string record in a 
segment.
+     * @param wid  the writer id
+     */
+    private void newSegment(String wid) {
+        this.segment = new Segment(tracker, buffer);
+        String metaInfo = "{\"wid\":\"" + wid + '"' +
+                ",\"sno\":" + tracker.getNextSegmentNo() +
+                ",\"gc\":" + tracker.getCompactionMap().getGeneration() +
+                ",\"t\":" + currentTimeMillis() + "}";
+
+        byte[] data = metaInfo.getBytes(Charsets.UTF_8);
+        if (data.length < Segment.SMALL_LIMIT) {
+            prepare(VALUE, data.length + 1, Collections.<RecordId>emptyList());
+            writeByte((byte) data.length);
+            writeBytes(data, 0, data.length);
+        } else {
+            prepare(VALUE, data.length + 2, Collections.<RecordId>emptyList());
+            writeShort((short) ((data.length - Segment.SMALL_LIMIT) | 0x8000));
+            writeBytes(data, 0, data.length);
+        }
+    }
+
+    static byte[] createNewBuffer(SegmentVersion v) {
+        byte[] buffer = new byte[Segment.MAX_SEGMENT_SIZE];
+        buffer[0] = '0';
+        buffer[1] = 'a';
+        buffer[2] = 'K';
+        buffer[3] = SegmentVersion.asByte(v);
+        buffer[4] = 0; // reserved
+        buffer[5] = 0; // refcount
+        return buffer;
+    }
+
+    public void writeByte(byte value) {
+        buffer[position++] = value;
+    }
+
+    public void writeShort(short value) {
+        buffer[position++] = (byte) (value >> 8);
+        buffer[position++] = (byte) value;
+    }
+
+    public void writeInt(int value) {
+        buffer[position++] = (byte) (value >> 24);
+        buffer[position++] = (byte) (value >> 16);
+        buffer[position++] = (byte) (value >> 8);
+        buffer[position++] = (byte) value;
+    }
+
+    public void writeLong(long value) {
+        writeInt((int) (value >> 32));
+        writeInt((int) value);
+    }
+
+    /**
+     * Write a record id, and marks the record id as referenced (removes it 
from
+     * the unreferenced set).
+     *
+     * @param listId the record id
+     */
+    public void writeRecordId(RecordId listId) {
+        checkNotNull(listId);
+        roots.remove(listId);
+
+        int offset = listId.getOffset();
+        checkState(0 <= offset && offset < MAX_SEGMENT_SIZE);
+        checkState(offset == align(offset));
+
+        buffer[position++] = (byte) getSegmentRef(listId.getSegmentId());
+        buffer[position++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
+        buffer[position++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+    }
+
+    private int getSegmentRef(SegmentId segmentId) {
+        int refCount = segment.getRefCount();
+        if (refCount > SEGMENT_REFERENCE_LIMIT) {
+            throw new SegmentOverflowException(
+                    "Segment cannot have more than 255 references " + 
segment.getSegmentId());
+        }
+        for (int index = 0; index < refCount; index++) {
+            if (segmentId.equals(segment.getRefId(index))) {
+                return index;
+            }
+        }
+
+        ByteBuffer.wrap(buffer, refCount * 16, 16)
+                .putLong(segmentId.getMostSignificantBits())
+                .putLong(segmentId.getLeastSignificantBits());
+        buffer[Segment.REF_COUNT_OFFSET] = (byte) refCount;
+        return refCount;
+    }
+
+    public void writeBytes(byte[] data, int offset, int length) {
+        arraycopy(data, offset, buffer, position, length);
+        position += length;
+    }
+
+    public void addBlobRef(RecordId blobId) {
+        blobrefs.add(blobId);
+    }
+
+    /**
+     * Adds a segment header to the buffer and writes a segment to the segment
+     * store. This is done automatically (called from prepare) when there is 
not
+     * enough space for a record. It can also be called explicitly.
+     */
+    public void flush() {
+        // Id of the segment to be written in the file store. If the segment id
+        // is not null, a segment will be written outside of the synchronized 
block.
+        SegmentId segmentId = null;
+
+        // Buffer containing segment data, and offset and length to locate the
+        // segment data into the buffer. These variable will be initialized in
+        // the synchronized block.
+        byte[] segmentBuffer = null;
+        int segmentOffset = 0;
+        int segmentLength = 0;
+
+        if (length > 0) {
+            int refcount = segment.getRefCount();
+
+            int rootcount = roots.size();
+            buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
+            buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
+
+            int blobrefcount = blobrefs.size();
+            buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8);
+            buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
+
+            length = align(
+                    refcount * 16 + rootcount * 3 + blobrefcount * 2 + length,
+                    16);
+
+            checkState(length <= buffer.length);
+
+            int pos = refcount * 16;
+            if (pos + length <= buffer.length) {
+                // the whole segment fits to the space *after* the referenced
+                // segment identifiers we've already written, so we can safely
+                // copy those bits ahead even if concurrent code is still
+                // reading from that part of the buffer
+                arraycopy(buffer, 0, buffer, buffer.length - length, pos);
+                pos += buffer.length - length;
+            } else {
+                // this might leave some empty space between the header and
+                // the record data, but this case only occurs when the
+                // segment is >252kB in size and the maximum overhead is <<4kB,
+                // which is acceptable
+                length = buffer.length;
+            }
+
+            for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
+                int offset = entry.getKey().getOffset();
+                buffer[pos++] = (byte) entry.getValue().ordinal();
+                buffer[pos++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
+                buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+            }
+
+            for (RecordId blobref : blobrefs) {
+                int offset = blobref.getOffset();
+                buffer[pos++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
+                buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+            }
+
+            segmentId = segment.getSegmentId();
+            segmentBuffer = buffer;
+            segmentOffset = buffer.length - length;
+            segmentLength = length;
+
+            buffer = createNewBuffer(version);
+            roots.clear();
+            blobrefs.clear();
+            length = 0;
+            position = buffer.length;
+            newSegment(wid);
+        }
+
+        if (segmentId != null) {
+            LOG.debug("Writing data segment {} ({} bytes)", segmentId, 
segmentLength);
+            store.writeSegment(segmentId, segmentBuffer, segmentOffset, 
segmentLength);
+
+            // Keep this segment in memory as it's likely to be accessed soon
+            ByteBuffer data;
+            if (segmentOffset > 4096) {
+                data = ByteBuffer.allocate(segmentLength);
+                data.put(segmentBuffer, segmentOffset, segmentLength);
+                data.rewind();
+            } else {
+                data = ByteBuffer.wrap(segmentBuffer, segmentOffset, 
segmentLength);
+            }
+
+            // It is important to put the segment into the cache only *after* 
it has been
+            // written to the store since as soon as it is in the cache it 
becomes eligible
+            // for eviction, which might lead to SNFEs when it is not yet in 
the store at that point.
+            tracker.setSegment(segmentId, new Segment(tracker, segmentId, 
data));
+        }
+    }
+
+    /**
+     * Before writing a record (which are written backwards, from the end of 
the
+     * file to the beginning), this method is called, to ensure there is enough
+     * space. A new segment is also created if there is not enough space in the
+     * segment lookup table or elsewhere.
+     * <p>
+     * This method does not actually write into the segment, just allocates the
+     * space (flushing the segment if needed and starting a new one), and sets
+     * the write position (records are written from the end to the beginning,
+     * but within a record from left to right).
+     *
+     * @param type the record type (only used for root records)
+     * @param size the size of the record, excluding the size used for the
+     *            record ids
+     * @param ids the record ids
+     * @return a new record id
+     */
+    public RecordId prepare(RecordType type, int size, Collection<RecordId> 
ids) {
+        checkArgument(size >= 0);
+        checkNotNull(ids);
+
+        int idCount = ids.size();
+        int recordSize = align(size + idCount * RECORD_ID_BYTES);
+
+        // First compute the header and segment sizes based on the assumption
+        // that *all* identifiers stored in this record point to previously
+        // unreferenced segments.
+        int refCount = segment.getRefCount() + idCount;
+        int blobRefCount = blobrefs.size() + 1;
+        int rootCount = roots.size() + 1;
+        int headerSize = refCount * 16 + rootCount * 3 + blobRefCount * 2;
+        int segmentSize = align(headerSize + recordSize + length, 16);
+
+        // If the size estimate looks too big, recompute it with a more
+        // accurate refCount value. We skip doing this when possible to
+        // avoid the somewhat expensive list and set traversals.
+        if (segmentSize > buffer.length - 1
+                || refCount > Segment.SEGMENT_REFERENCE_LIMIT) {
+            refCount -= idCount;
+
+            Set<SegmentId> segmentIds = newHashSet();
+
+            // The set of old record ids in this segment
+            // that were previously root record ids, but will no longer be,
+            // because the record to be written references them.
+            // This needs to be a set, because the list of ids can
+            // potentially reference the same record multiple times
+            Set<RecordId> notRoots = new HashSet<RecordId>();
+            for (RecordId recordId : ids) {
+                SegmentId segmentId = recordId.getSegmentId();
+                if (!(segmentId.equals(segment.getSegmentId()))) {
+                    segmentIds.add(segmentId);
+                } else if (roots.containsKey(recordId)) {
+                    notRoots.add(recordId);
+                }
+            }
+            rootCount -= notRoots.size();
+
+            if (!segmentIds.isEmpty()) {
+                for (int refid = 1; refid < refCount; refid++) {
+                    segmentIds.remove(segment.getRefId(refid));
+                }
+                refCount += segmentIds.size();
+            }
+
+            headerSize = refCount * 16 + rootCount * 3 + blobRefCount * 2;
+            segmentSize = align(headerSize + recordSize + length, 16);
+        }
+
+        if (segmentSize > buffer.length - 1
+                || blobRefCount > 0xffff
+                || rootCount > 0xffff
+                || refCount > Segment.SEGMENT_REFERENCE_LIMIT) {
+            flush();
+        }
+
+        length += recordSize;
+        position = buffer.length - length;
+        checkState(position >= 0);
+
+        RecordId id = new RecordId(segment.getSegmentId(), position);
+        roots.put(id, type);
+        return id;
+    }
+
+    private static int align(int value) {
+        return align(value, 1 << Segment.RECORD_ALIGN_BITS);
+    }
+
+    private static int align(int value, int boundary) {
+        return (value + boundary - 1) & ~(boundary - 1);
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1716466&r1=1716465&r2=1716466&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
 Wed Nov 25 16:28:33 2015
@@ -26,20 +26,25 @@ import static com.google.common.collect.
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
 import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Maps.newLinkedHashMap;
 import static com.google.common.collect.Sets.newHashSet;
-import static java.lang.System.currentTimeMillis;
-import static java.lang.System.identityHashCode;
+import static java.lang.Thread.currentThread;
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.nCopies;
+import static java.util.Collections.singleton;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.NAME;
 import static org.apache.jackrabbit.oak.api.Type.NAMES;
 import static 
org.apache.jackrabbit.oak.plugins.segment.MapRecord.BUCKETS_PER_LEVEL;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.BLOCK;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.BRANCH;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.BUCKET;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.LEAF;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.LIST;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.NODE;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.TEMPLATE;
+import static org.apache.jackrabbit.oak.plugins.segment.RecordType.VALUE;
 import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
-import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ID_BYTES;
-import static 
org.apache.jackrabbit.oak.plugins.segment.Segment.SEGMENT_REFERENCE_LIMIT;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.readString;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentVersion.V_11;
 
@@ -47,16 +52,14 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jcr.PropertyType;
 
@@ -84,45 +87,17 @@ import org.slf4j.LoggerFactory;
  * split: new segments are automatically created if and when needed).
  */
 public class SegmentWriter {
-
-    /** Logger instance */
-    private static final Logger log =
-            LoggerFactory.getLogger(SegmentWriter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentWriter.class);
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
 
-    private static final AtomicInteger SEGMENT_COUNTER = new AtomicInteger();
-
-    static byte[] createNewBuffer(SegmentVersion v) {
-        byte[] buffer = new byte[Segment.MAX_SEGMENT_SIZE];
-        buffer[0] = '0';
-        buffer[1] = 'a';
-        buffer[2] = 'K';
-        buffer[3] = SegmentVersion.asByte(v);
-        buffer[4] = 0; // reserved
-        buffer[5] = 0; // refcount
-        return buffer;
-    }
-
-    private static int align(int value) {
-        return align(value, 1 << Segment.RECORD_ALIGN_BITS);
-    }
-
-    private static int align(int value, int boundary) {
-        return (value + boundary - 1) & ~(boundary - 1);
-    }
-
-
-    private final SegmentTracker tracker;
-
-    private final SegmentStore store;
+    private final SegmentBuilderPool segmentBuilderPool = new 
SegmentBuilderPool();
 
     /**
      * Cache of recently stored string and template records, used to
      * avoid storing duplicates of frequently occurring data.
      * Should only be accessed from synchronized blocks to prevent corruption.
      */
-    @SuppressWarnings("serial")
     private final Map<Object, RecordId> records =
         new LinkedHashMap<Object, RecordId>(15000, 0.75f, true) {
             @Override
@@ -131,45 +106,15 @@ public class SegmentWriter {
             }
         };
 
-    /**
-     * The set of root records (i.e. ones not referenced by other records)
-     * in this segment.
-     */
-    private final Map<RecordId, RecordType> roots = newLinkedHashMap();
-
-    /**
-     * Identifiers of the external blob references stored in this segment.
-     */
-    private final List<RecordId> blobrefs = newArrayList();
-
-    /**
-     * The segment write buffer, filled from the end to the beginning
-     * (see OAK-629).
-     */
-    private byte[] buffer;
-
-    /**
-     * The number of bytes already written (or allocated). Counted from
-     * the <em>end</em> of the buffer.
-     */
-    private int length = 0;
-
-    /**
-     * Current write position within the buffer. Grows up when raw data
-     * is written, but shifted downwards by the prepare methods.
-     */
-    private int position;
+    private final SegmentStore store;
 
-    private Segment segment;
+    private final SegmentTracker tracker;
 
     /**
      * Version of the segment storage format.
      */
     private final SegmentVersion version;
 
-    /**
-     * Id of this writer.
-     */
     private final String wid;
 
     public SegmentWriter(SegmentStore store, SegmentTracker tracker, 
SegmentVersion version) {
@@ -186,281 +131,83 @@ public class SegmentWriter {
         this.store = store;
         this.tracker = tracker;
         this.version = version;
-        this.buffer = createNewBuffer(version);
-        this.wid = wid == null
-            ? "w-" + identityHashCode(this)
-            : wid;
-        newSegment(wid);
+        this.wid = wid;
     }
 
-    /**
-     * Allocate a new segment and write the segment meta data.
-     * The segment meta data is a string of the format {@code 
"{wid=W,sno=S,gc=G,t=T}"}
-     * where:
-     * <ul>
-     * <li>{@code W} is the writer id {@code wid}, </li>
-     * <li>{@code S} is a unique, increasing sequence number corresponding to 
the allocation order
-     * of the segments in this store, </li>
-     * <li>{@code G} is the garbage collection generation (i.e. the number of 
compaction cycles
-     * that have been run),</li>
-     * <li>{@code T} is a time stamp according to {@link 
System#currentTimeMillis()}.</li>
-     * </ul>
-     * The segment meta data is guaranteed to be the first string record in a 
segment.
-     * @param wid  the writer id
-     */
-    private void newSegment(String wid) {
-        this.segment = new Segment(tracker, buffer);
-        writeString(
-            "{\"wid\":\"" + wid + '"' +
-            ",\"sno\":" + tracker.getNextSegmentNo() +
-            ",\"gc\":" + tracker.getCompactionMap().getGeneration() +
-            ",\"t\":" + currentTimeMillis() + "}");
+    SegmentTracker getTracker() {
+        return tracker;
     }
 
-    /**
-     * Adds a segment header to the buffer and writes a segment to the segment
-     * store. This is done automatically (called from prepare) when there is 
not
-     * enough space for a record. It can also be called explicitly.
-     */
     public void flush() {
-        // Id of the segment to be written in the file store. If the segment id
-        // is not null, a segment will be written outside of the synchronized 
block.
-        SegmentId segmentId = null;
-
-        // Buffer containing segment data, and offset and length to locate the
-        // segment data into the buffer. These variable will be initialized in
-        // the synchronized block.
-        byte[] segmentBuffer = null;
-        int segmentOffset = 0;
-        int segmentLength = 0;
-
-        synchronized (this) {
-            if (length > 0) {
-                int refcount = segment.getRefCount();
-
-                int rootcount = roots.size();
-                buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
-                buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
-
-                int blobrefcount = blobrefs.size();
-                buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 
8);
-                buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
-
-                length = align(
-                        refcount * 16 + rootcount * 3 + blobrefcount * 2 + 
length,
-                        16);
-
-                checkState(length <= buffer.length);
-
-                int pos = refcount * 16;
-                if (pos + length <= buffer.length) {
-                    // the whole segment fits to the space *after* the 
referenced
-                    // segment identifiers we've already written, so we can 
safely
-                    // copy those bits ahead even if concurrent code is still
-                    // reading from that part of the buffer
-                    System.arraycopy(buffer, 0, buffer, buffer.length - 
length, pos);
-                    pos += buffer.length - length;
-                } else {
-                    // this might leave some empty space between the header and
-                    // the record data, but this case only occurs when the
-                    // segment is >252kB in size and the maximum overhead is 
<<4kB,
-                    // which is acceptable
-                    length = buffer.length;
-                }
-
-                for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) 
{
-                    int offset = entry.getKey().getOffset();
-                    buffer[pos++] = (byte) entry.getValue().ordinal();
-                    buffer[pos++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
-                    buffer[pos++] = (byte) (offset >> 
Segment.RECORD_ALIGN_BITS);
-                }
-
-                for (RecordId blobref : blobrefs) {
-                    int offset = blobref.getOffset();
-                    buffer[pos++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
-                    buffer[pos++] = (byte) (offset >> 
Segment.RECORD_ALIGN_BITS);
-                }
-
-                segmentId = segment.getSegmentId();
-                segmentBuffer = buffer;
-                segmentOffset = buffer.length - length;
-                segmentLength = length;
-
-                buffer = createNewBuffer(version);
-                roots.clear();
-                blobrefs.clear();
-                length = 0;
-                position = buffer.length;
-                newSegment(wid);
+        segmentBuilderPool.flush();
+    }
+
+    MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
+        if (base != null && base.isDiff()) {
+            Segment segment = base.getSegment();
+            RecordId key = segment.readRecordId(base.getOffset(8));
+            String name = readString(key);
+            if (!changes.containsKey(name)) {
+                changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
             }
+            base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
         }
 
-        if (segmentId != null) {
-            log.debug("Writing data segment {} ({} bytes)", segmentId, 
segmentLength);
-            store.writeSegment(segmentId, segmentBuffer, segmentOffset, 
segmentLength);
-
-            // Keep this segment in memory as it's likely to be accessed soon
-            ByteBuffer data;
-            if (segmentOffset > 4096) {
-                data = ByteBuffer.allocate(segmentLength);
-                data.put(segmentBuffer, segmentOffset, segmentLength);
-                data.rewind();
-            } else {
-                data = ByteBuffer.wrap(segmentBuffer, segmentOffset, 
segmentLength);
+        if (base != null && changes.size() == 1) {
+            Map.Entry<String, RecordId> change =
+                changes.entrySet().iterator().next();
+            final RecordId value = change.getValue();
+            if (value != null) {
+                final MapEntry entry = base.getEntry(change.getKey());
+                if (entry != null) {
+                    if (value.equals(entry.getValue())) {
+                        return base;
+                    } else {
+                        final MapRecord baseMap = base;
+                        RecordId mapId = writeRecord(new RecordWriter(BRANCH, 
8, asList(
+                            entry.getKey(), value, baseMap.getRecordId())) {
+                            @Override
+                            protected void write(RecordId id, SegmentBuilder 
builder) {
+                                builder.writeInt(-1);
+                                builder.writeInt(entry.getHash());
+                                builder.writeRecordId(entry.getKey());
+                                builder.writeRecordId(value);
+                                builder.writeRecordId(baseMap.getRecordId());
+                            }
+                        });
+                        return new MapRecord(mapId);
+                    }
+                }
             }
-
-            // It is important to put the segment into the cache only *after* 
it has been
-            // written to the store since as soon as it is in the cache it 
becomes eligible
-            // for eviction, which might lead to SNFEs when it is not yet in 
the store at that point.
-            tracker.setSegment(segmentId, new Segment(tracker, segmentId, 
data));
         }
-    }
-
-    private RecordId prepare(RecordType type, int size) {
-        return prepare(type, size, Collections.<RecordId>emptyList());
-    }
 
-    /**
-     * Before writing a record (which are written backwards, from the end of 
the
-     * file to the beginning), this method is called, to ensure there is enough
-     * space. A new segment is also created if there is not enough space in the
-     * segment lookup table or elsewhere.
-     * <p>
-     * This method does not actually write into the segment, just allocates the
-     * space (flushing the segment if needed and starting a new one), and sets
-     * the write position (records are written from the end to the beginning,
-     * but within a record from left to right).
-     * 
-     * @param type the record type (only used for root records)
-     * @param size the size of the record, excluding the size used for the
-     *            record ids
-     * @param ids the record ids
-     * @return a new record id
-     */
-    private RecordId prepare(
-            RecordType type, int size, Collection<RecordId> ids) {
-        checkArgument(size >= 0);
-        checkNotNull(ids);
-
-        int idcount = ids.size();
-        int recordSize = align(size + idcount * RECORD_ID_BYTES);
-
-        // First compute the header and segment sizes based on the assumption
-        // that *all* identifiers stored in this record point to previously
-        // unreferenced segments.
-        int refcount = segment.getRefCount() + idcount;
-        int blobrefcount = blobrefs.size() + 1;
-        int rootcount = roots.size() + 1;
-        int headerSize = refcount * 16 + rootcount * 3 + blobrefcount * 2;
-        int segmentSize = align(headerSize + recordSize + length, 16);
-
-        // If the size estimate looks too big, recompute it with a more
-        // accurate refcount value. We skip doing this when possible to
-        // avoid the somewhat expensive list and set traversals.
-        if (segmentSize > buffer.length - 1
-                || refcount > Segment.SEGMENT_REFERENCE_LIMIT) {
-            refcount -= idcount;
-
-            Set<SegmentId> segmentIds = newHashSet();
-
-            // The set of old record ids in this segment
-            // that were previously root record ids, but will no longer be,
-            // because the record to be written references them.
-            // This needs to be a set, because the list of ids can
-            // potentially reference the same record multiple times
-            Set<RecordId> notRoots = new HashSet<RecordId>();
-            for (RecordId recordId : ids) {
-                SegmentId segmentId = recordId.getSegmentId();
-                if (!(segmentId.equals(segment.getSegmentId()))) {
-                    segmentIds.add(segmentId);
-                } else if (roots.containsKey(recordId)) {
-                    notRoots.add(recordId);
-                }
-            }
-            rootcount -= notRoots.size();
-
-            if (!segmentIds.isEmpty()) {
-                for (int refid = 1; refid < refcount; refid++) {
-                    segmentIds.remove(segment.getRefId(refid));
-                }
-                refcount += segmentIds.size();
-            }
-
-            headerSize = refcount * 16 + rootcount * 3 + blobrefcount * 2;
-            segmentSize = align(headerSize + recordSize + length, 16);
-        }
-
-        if (segmentSize > buffer.length - 1
-                || blobrefcount > 0xffff
-                || rootcount > 0xffff
-                || refcount > Segment.SEGMENT_REFERENCE_LIMIT) {
-            flush();
-        }
-
-        length += recordSize;
-        position = buffer.length - length;
-        checkState(position >= 0);
+        List<MapEntry> entries = newArrayList();
+        for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
+            String key = entry.getKey();
 
-        RecordId id = new RecordId(segment.getSegmentId(), position);
-        roots.put(id, type);
-        return id;
-    }
+            RecordId keyId = null;
+            if (base != null) {
+                MapEntry e = base.getEntry(key);
+                if (e != null) {
+                    keyId = e.getKey();
+                }
+            }
+            if (keyId == null && entry.getValue() != null) {
+                keyId = writeString(key);
+            }
 
-    private synchronized int getSegmentRef(SegmentId segmentId) {
-        int refcount = segment.getRefCount();
-        if (refcount > SEGMENT_REFERENCE_LIMIT) {
-          throw new SegmentOverflowException(
-                  "Segment cannot have more than 255 references " + 
segment.getSegmentId());
-        }
-        for (int index = 0; index < refcount; index++) {
-            if (segmentId.equals(segment.getRefId(index))) {
-                return index;
+            if (keyId != null) {
+                entries.add(new MapEntry(key, keyId, entry.getValue()));
             }
         }
 
-        ByteBuffer.wrap(buffer, refcount * 16, 16)
-            .putLong(segmentId.getMostSignificantBits())
-            .putLong(segmentId.getLeastSignificantBits());
-        buffer[Segment.REF_COUNT_OFFSET] = (byte) refcount;
-        return refcount;
-    }
-
-    /**
-     * Write a record id, and marks the record id as referenced (removes it 
from
-     * the unreferenced set).
-     * 
-     * @param recordId the record id
-     */
-    private synchronized void writeRecordId(RecordId recordId) {
-        checkNotNull(recordId);
-        roots.remove(recordId);
-
-        int offset = recordId.getOffset();
-        checkState(0 <= offset && offset < MAX_SEGMENT_SIZE);
-        checkState(offset == align(offset));
-
-        buffer[position++] = (byte) getSegmentRef(recordId.getSegmentId());
-        buffer[position++] = (byte) (offset >> (8 + 
Segment.RECORD_ALIGN_BITS));
-        buffer[position++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
-    }
-
-    private void writeInt(int value) {
-        buffer[position++] = (byte) (value >> 24);
-        buffer[position++] = (byte) (value >> 16);
-        buffer[position++] = (byte) (value >> 8);
-        buffer[position++] = (byte) value;
-    }
-
-    private void writeLong(long value) {
-        writeInt((int) (value >> 32));
-        writeInt((int) value);
+        return writeMapBucket(base, entries, 0);
     }
 
-    private MapRecord writeMapLeaf(
-            int level, Collection<MapEntry> entries) {
+    private MapRecord writeMapLeaf(final int level, Collection<MapEntry> 
entries) {
         checkNotNull(entries);
 
-        int size = entries.size();
+        final int size = entries.size();
         checkElementIndex(size, MapRecord.MAX_SIZE);
         checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
         checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
@@ -472,65 +219,62 @@ public class SegmentWriter {
         }
 
         // copy the entries to an array so we can sort them before writing
-        MapEntry[] array = entries.toArray(new MapEntry[entries.size()]);
+        final MapEntry[] array = entries.toArray(new MapEntry[entries.size()]);
         Arrays.sort(array);
 
-        synchronized (this) {
-            RecordId id = prepare(RecordType.LEAF, 4 + size * 4, ids);
-            writeInt((level << MapRecord.SIZE_BITS) | size);
-            for (MapEntry entry : array) {
-                writeInt(entry.getHash());
-            }
-            for (MapEntry entry : array) {
-                writeRecordId(entry.getKey());
-                writeRecordId(entry.getValue());
+        RecordId mapId = writeRecord(new RecordWriter(LEAF, 4 + size * 4, ids) 
{
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeInt((level << MapRecord.SIZE_BITS) | size);
+                for (MapEntry entry : array) {
+                    builder.writeInt(entry.getHash());
+                }
+                for (MapEntry entry : array) {
+                    builder.writeRecordId(entry.getKey());
+                    builder.writeRecordId(entry.getValue());
+                }
             }
-            return new MapRecord(id);
-        }
+        });
+        return new MapRecord(mapId);
     }
 
-    private MapRecord writeMapBranch(int level, int size, MapRecord[] buckets) 
{
+    private MapRecord writeMapBranch(final int level, final int size, 
MapRecord[] buckets) {
         int bitmap = 0;
-        List<RecordId> ids = Lists.newArrayListWithCapacity(buckets.length);
+        final List<RecordId> bucketIds = 
Lists.newArrayListWithCapacity(buckets.length);
         for (int i = 0; i < buckets.length; i++) {
             if (buckets[i] != null) {
                 bitmap |= 1L << i;
-                ids.add(buckets[i].getRecordId());
+                bucketIds.add(buckets[i].getRecordId());
             }
         }
 
-        synchronized (this) {
-            RecordId mapId = prepare(RecordType.BRANCH, 8, ids);
-            writeInt((level << MapRecord.SIZE_BITS) | size);
-            writeInt(bitmap);
-            for (RecordId id : ids) {
-                writeRecordId(id);
+        final int bits = bitmap;
+        RecordId mapId = writeRecord(new RecordWriter(BRANCH, 8, bucketIds) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeInt((level << MapRecord.SIZE_BITS) | size);
+                builder.writeInt(bits);
+                for (RecordId buckedId : bucketIds) {
+                    builder.writeRecordId(buckedId);
+                }
             }
-            return new MapRecord(mapId);
-        }
-    }
-
-    private synchronized RecordId writeListBucket(List<RecordId> bucket) {
-        checkArgument(bucket.size() > 1);
-        RecordId bucketId = prepare(RecordType.BUCKET, 0, bucket);
-        for (RecordId id : bucket) {
-            writeRecordId(id);
-        }
-        return bucketId;
+        });
+        return new MapRecord(mapId);
     }
 
-    private synchronized MapRecord writeMapBucket(
-            MapRecord base, Collection<MapEntry> entries, int level) {
+    private MapRecord writeMapBucket(MapRecord base, Collection<MapEntry> 
entries, int level) {
         // when no changed entries, return the base map (if any) as-is
         if (entries == null || entries.isEmpty()) {
             if (base != null) {
                 return base;
             } else if (level == 0) {
-                synchronized (this) {
-                    RecordId id = prepare(RecordType.LEAF, 4);
-                    writeInt(0);
-                    return new MapRecord(id);
-                }
+                RecordId mapId = writeRecord(new RecordWriter(LEAF, 4) {
+                    @Override
+                    protected void write(RecordId id, SegmentBuilder builder) {
+                        builder.writeInt(0);
+                    }
+                });
+                return new MapRecord(mapId);
             } else {
                 return null;
             }
@@ -589,164 +333,25 @@ public class SegmentWriter {
             return writeMapBranch(level, newSize, buckets);
         } else if (newCount <= 1) {
             // up to one bucket contains entries, so return that as the new map
-            for (int i = 0; i < buckets.length; i++) {
-                if (buckets[i] != null) {
-                    return buckets[i];
+            for (MapRecord bucket : buckets) {
+                if (bucket != null) {
+                    return bucket;
                 }
             }
             // no buckets remaining, return empty map
             return writeMapBucket(null, null, level);
         } else {
             // combine all remaining entries into a leaf record
-            List<MapEntry> list = Lists.newArrayList();
-            for (int i = 0; i < buckets.length; i++) {
-                if (buckets[i] != null) {
-                    addAll(list, buckets[i].getEntries());
+            List<MapEntry> list = newArrayList();
+            for (MapRecord bucket : buckets) {
+                if (bucket != null) {
+                    addAll(list, bucket.getEntries());
                 }
             }
             return writeMapLeaf(level, list);
         }
     }
 
-    private static List<List<MapEntry>> splitToBuckets(
-            Collection<MapEntry> entries, int level) {
-        List<MapEntry> empty = null;
-        int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
-        int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
-
-        List<List<MapEntry>> buckets =
-                newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, empty));
-        for (MapEntry entry : entries) {
-            int index = (entry.getHash() >> shift) & mask;
-            List<MapEntry> bucket = buckets.get(index);
-            if (bucket == null) {
-                bucket = newArrayList();
-                buckets.set(index, bucket);
-            }
-            bucket.add(entry);
-        }
-        return buckets;
-    }
-
-    private synchronized RecordId writeValueRecord(
-            long length, RecordId blocks) {
-        RecordId valueId = prepare(
-                RecordType.VALUE, 8, Collections.singleton(blocks));
-        writeLong((length - Segment.MEDIUM_LIMIT) | (0x3L << 62));
-        writeRecordId(blocks);
-        return valueId;
-    }
-
-    private synchronized RecordId writeValueRecord(int length, byte[] data) {
-        checkArgument(length < Segment.MEDIUM_LIMIT);
-        RecordId id;
-        if (length < Segment.SMALL_LIMIT) {
-            id = prepare(RecordType.VALUE, 1 + length);
-            buffer[position++] = (byte) length;
-        } else {
-            id = prepare(RecordType.VALUE, 2 + length);
-            int len = (length - Segment.SMALL_LIMIT) | 0x8000;
-            buffer[position++] = (byte) (len >> 8);
-            buffer[position++] = (byte) len;
-        }
-        System.arraycopy(data, 0, buffer, position, length);
-        position += length;
-        return id;
-    }
-
-    /**
-     * Write a reference to an external blob. This method handles blob IDs of
-     * every length, but behaves differently for small and large blob IDs.
-     *
-     * @param blobId Blob ID.
-     * @return Record ID pointing to the written blob ID.
-     * @see Segment#BLOB_ID_SMALL_LIMIT
-     */
-    private RecordId writeBlobId(String blobId) {
-        byte[] data = blobId.getBytes(Charsets.UTF_8);
-
-        if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
-            return writeSmallBlobId(data);
-        } else {
-            return writeLargeBlobId(blobId);
-        }
-    }
-
-    /**
-     * Write a large blob ID. A blob ID is considered large if the length of 
its
-     * binary representation is equal to or greater than {@code
-     * Segment.BLOB_ID_SMALL_LIMIT}.
-     *
-     * @param blobId Blob ID.
-     * @return A record ID pointing to the written blob ID.
-     */
-    private RecordId writeLargeBlobId(String blobId) {
-        RecordId stringRecord = writeString(blobId);
-
-        synchronized (this) {
-            RecordId blobIdRecord = prepare(RecordType.VALUE, 1, 
Collections.singletonList(stringRecord));
-
-            // The length uses a fake "length" field that is always equal to 
0xF0.
-            // This allows the code to take apart small from a large blob IDs.
-
-            buffer[position++] = (byte) 0xF0;
-            writeRecordId(stringRecord);
-
-            blobrefs.add(blobIdRecord);
-
-            return blobIdRecord;
-        }
-    }
-
-    /**
-     * Write a small blob ID. A blob ID is considered small if the length of 
its
-     * binary representation is less than {@code Segment.BLOB_ID_SMALL_LIMIT}.
-     *
-     * @param blobId Blob ID.
-     * @return A record ID pointing to the written blob ID.
-     */
-    private RecordId writeSmallBlobId(byte[] blobId) {
-        int length = blobId.length;
-
-        checkArgument(length < Segment.BLOB_ID_SMALL_LIMIT);
-
-        synchronized (this) {
-            RecordId id = prepare(RecordType.VALUE, 2 + length);
-
-            int masked = length | 0xE000;
-
-            buffer[position++] = (byte) (masked >> 8);
-            buffer[position++] = (byte) (masked);
-
-            System.arraycopy(blobId, 0, buffer, position, length);
-
-            position += length;
-
-            blobrefs.add(id);
-
-            return id;
-        }
-    }
-
-    /**
-     * Writes a block record containing the given block of bytes.
-     *
-     * @param bytes source buffer
-     * @param offset offset within the source buffer
-     * @param length number of bytes to write
-     * @return block record identifier
-     */
-    public synchronized RecordId writeBlock(
-            byte[] bytes, int offset, int length) {
-        checkNotNull(bytes);
-        checkPositionIndexes(offset, offset + length, bytes.length);
-
-        RecordId blockId = prepare(RecordType.BLOCK, length);
-        System.arraycopy(bytes, offset, buffer, position, length);
-        position += length;
-        return blockId;
-    }
-
     /**
      * Writes a list record containing the given list of record identifiers.
      *
@@ -756,12 +361,11 @@ public class SegmentWriter {
     public RecordId writeList(List<RecordId> list) {
         checkNotNull(list);
         checkArgument(!list.isEmpty());
-
         List<RecordId> thisLevel = list;
         while (thisLevel.size() > 1) {
-            List<RecordId> nextLevel = Lists.newArrayList();
+            List<RecordId> nextLevel = newArrayList();
             for (List<RecordId> bucket :
-                    Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
+                Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
                 if (bucket.size() > 1) {
                     nextLevel.add(writeListBucket(bucket));
                 } else {
@@ -773,63 +377,67 @@ public class SegmentWriter {
         return thisLevel.iterator().next();
     }
 
-    MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
-        if (base != null && base.isDiff()) {
-            Segment segment = base.getSegment();
-            RecordId key = segment.readRecordId(base.getOffset(8));
-            String name = readString(key);
-            if (!changes.containsKey(name)) {
-                changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
-            }
-            base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
-        }
-
-        if (base != null && changes.size() == 1) {
-            Map.Entry<String, RecordId> change =
-                    changes.entrySet().iterator().next();
-            RecordId value = change.getValue();
-            if (value != null) {
-                MapEntry entry = base.getEntry(change.getKey());
-                if (entry != null) {
-                    if (value.equals(entry.getValue())) {
-                        return base;
-                    } else {
-                        synchronized (this) {
-                            RecordId id = prepare(RecordType.BRANCH, 8, asList(
-                                    entry.getKey(), value, 
base.getRecordId()));
-                            writeInt(-1);
-                            writeInt(entry.getHash());
-                            writeRecordId(entry.getKey());
-                            writeRecordId(value);
-                            writeRecordId(base.getRecordId());
-                            return new MapRecord(id);
-                        }
-                    }
+    private RecordId writeListBucket(final List<RecordId> bucket) {
+        checkArgument(bucket.size() > 1);
+        return writeRecord(new RecordWriter(BUCKET, 0, bucket) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                for (RecordId bucketId : bucket) {
+                    builder.writeRecordId(bucketId);
                 }
             }
-        }
+        });
+    }
 
-        List<MapEntry> entries = Lists.newArrayList();
-        for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
-            String key = entry.getKey();
+    private static List<List<MapEntry>> splitToBuckets(Collection<MapEntry> 
entries, int level) {
+        List<MapEntry> empty = null;
+        int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
+        int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
 
-            RecordId keyId = null;
-            if (base != null) {
-                MapEntry e = base.getEntry(key);
-                if (e != null) {
-                    keyId = e.getKey();
-                }
-            }
-            if (keyId == null && entry.getValue() != null) {
-                keyId = writeString(key);
+        List<List<MapEntry>> buckets =
+                newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, empty));
+        for (MapEntry entry : entries) {
+            int index = (entry.getHash() >> shift) & mask;
+            List<MapEntry> bucket = buckets.get(index);
+            if (bucket == null) {
+                bucket = newArrayList();
+                buckets.set(index, bucket);
             }
+            bucket.add(entry);
+        }
+        return buckets;
+    }
 
-            if (keyId != null) {
-                entries.add(new MapEntry(key, keyId, entry.getValue()));
+    private RecordId writeValueRecord(final long length, final RecordId 
blocks) {
+        return writeRecord(new RecordWriter(VALUE, 8, blocks) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeLong((length - Segment.MEDIUM_LIMIT) | (0x3L << 
62));
+                builder.writeRecordId(blocks);
             }
-        }
+        });
+    }
 
-        return writeMapBucket(base, entries, 0);
+    private RecordId writeValueRecord(final int length, final byte[] data) {
+        checkArgument(length < Segment.MEDIUM_LIMIT);
+        RecordId id;
+        if (length < Segment.SMALL_LIMIT) {
+            return writeRecord(new RecordWriter(VALUE, 1 + length) {
+                @Override
+                protected void write(RecordId id, SegmentBuilder builder) {
+                    builder.writeByte((byte) length);
+                    builder.writeBytes(data, 0, length);
+                }
+            });
+        } else {
+            return writeRecord(new RecordWriter(VALUE, 2 + length) {
+                @Override
+                protected void write(RecordId id, SegmentBuilder builder) {
+                    builder.writeShort((short) ((length - Segment.SMALL_LIMIT) 
| 0x8000));
+                    builder.writeBytes(data, 0, length);
+                }
+            });
+        }
     }
 
     /**
@@ -839,30 +447,23 @@ public class SegmentWriter {
      * @return value record identifier
      */
     public RecordId writeString(String string) {
-        synchronized (this) {
-            RecordId id = records.get(string);
-            if (id != null) {
-                return id; // shortcut if the same string was recently stored
-            }
+        RecordId id = getRecord(string);
+        if (id != null) {
+            return id; // shortcut if the same string was recently stored
         }
 
         byte[] data = string.getBytes(Charsets.UTF_8);
 
         if (data.length < Segment.MEDIUM_LIMIT) {
             // only cache short strings to avoid excessive memory use
-            synchronized (this) {
-                RecordId id = records.get(string);
-                if (id == null) {
-                    id = writeValueRecord(data.length, data);
-                    records.put(string, id);
-                }
-                return id;
-            }
+            id = writeValueRecord(data.length, data);
+            putRecord(string, id);
+            return id;
         }
 
         int pos = 0;
         List<RecordId> blockIds = newArrayListWithExpectedSize(
-                data.length / BLOCK_SIZE + 1);
+            data.length / BLOCK_SIZE + 1);
 
         // write as many full bulk segments as possible
         while (pos + MAX_SEGMENT_SIZE <= data.length) {
@@ -886,7 +487,7 @@ public class SegmentWriter {
 
     public SegmentBlob writeBlob(Blob blob) throws IOException {
         if (blob instanceof SegmentBlob
-                && store.containsSegment(((SegmentBlob) 
blob).getRecordId().getSegmentId())) {
+            && store.containsSegment(((SegmentBlob) 
blob).getRecordId().getSegmentId())) {
             return (SegmentBlob) blob;
         }
 
@@ -897,13 +498,91 @@ public class SegmentWriter {
                 RecordId id = writeBlobId(blobId);
                 return new SegmentBlob(id);
             } else {
-                log.debug("No blob found for reference {}, inlining...", 
reference);
+                LOG.debug("No blob found for reference {}, inlining...", 
reference);
             }
         }
 
         return writeStream(blob.getNewStream());
     }
 
+    /**
+     * Write a reference to an external blob. This method handles blob IDs of
+     * every length, but behaves differently for small and large blob IDs.
+     *
+     * @param blobId Blob ID.
+     * @return Record ID pointing to the written blob ID.
+     * @see Segment#BLOB_ID_SMALL_LIMIT
+     */
+    private RecordId writeBlobId(String blobId) {
+        byte[] data = blobId.getBytes(Charsets.UTF_8);
+        if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
+            return writeSmallBlobId(data);
+        } else {
+            return writeLargeBlobId(blobId);
+        }
+    }
+
+    /**
+     * Write a large blob ID. A blob ID is considered large if the length of 
its
+     * binary representation is equal to or greater than {@code
+     * Segment.BLOB_ID_SMALL_LIMIT}.
+     *
+     * @param blobId Blob ID.
+     * @return A record ID pointing to the written blob ID.
+     */
+    private RecordId writeLargeBlobId(String blobId) {
+        final RecordId stringRecord = writeString(blobId);
+        return writeRecord(new RecordWriter(VALUE, 1, stringRecord) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                // The length uses a fake "length" field that is always equal 
to 0xF0.
+                // This allows the code to take apart small from a large blob 
IDs.
+                builder.writeByte((byte) 0xF0);
+                builder.writeRecordId(stringRecord);
+                builder.addBlobRef(id);
+            }
+        });
+    }
+
+    /**
+     * Write a small blob ID. A blob ID is considered small if the length of 
its
+     * binary representation is less than {@code Segment.BLOB_ID_SMALL_LIMIT}.
+     *
+     * @param blobId Blob ID.
+     * @return A record ID pointing to the written blob ID.
+     */
+    private RecordId writeSmallBlobId(final byte[] blobId) {
+        final int length = blobId.length;
+        checkArgument(length < Segment.BLOB_ID_SMALL_LIMIT);
+        return writeRecord(new RecordWriter(VALUE, 2 + length) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeShort((short) (length | 0xE000));
+                builder.writeBytes(blobId, 0, length);
+                builder.addBlobRef(id);
+            }
+        });
+    }
+
+    /**
+     * Writes a block record containing the given block of bytes.
+     *
+     * @param bytes source buffer
+     * @param offset offset within the source buffer
+     * @param length number of bytes to write
+     * @return block record identifier
+     */
+    RecordId writeBlock(final byte[] bytes, final int offset, final int 
length) {
+        checkNotNull(bytes);
+        checkPositionIndexes(offset, offset + length, bytes.length);
+        return writeRecord(new RecordWriter(BLOCK, length) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeBytes(bytes, offset, length);
+            }
+        });
+    }
+
     SegmentBlob writeExternalBlob(String blobId) {
         RecordId id = writeBlobId(blobId);
         return new SegmentBlob(id);
@@ -914,10 +593,6 @@ public class SegmentWriter {
         return new SegmentBlob(id);
     }
 
-    public synchronized void dropCache() {
-        records.clear();
-    }
-
     /**
      * Writes a stream value record. The given stream is consumed
      * <em>and closed</em> by this method.
@@ -964,7 +639,7 @@ public class SegmentWriter {
         while (n != 0) {
             SegmentId bulkId = store.getTracker().newBulkSegmentId();
             int len = align(n);
-            log.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
+            LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
             store.writeSegment(bulkId, data, 0, len);
 
             for (int i = 0; i < n; i += BLOCK_SIZE) {
@@ -978,17 +653,16 @@ public class SegmentWriter {
         return writeValueRecord(length, writeList(blockIds));
     }
 
-    private RecordId writeProperty(PropertyState state) {
+    public RecordId writeProperty(PropertyState state) {
         Map<String, RecordId> previousValues = emptyMap();
         return writeProperty(state, previousValues);
     }
 
-    private RecordId writeProperty(
-            PropertyState state, Map<String, RecordId> previousValues) {
+    private RecordId writeProperty(PropertyState state, Map<String, RecordId> 
previousValues) {
         Type<?> type = state.getType();
-        int count = state.count();
+        final int count = state.count();
 
-        List<RecordId> valueIds = Lists.newArrayList();
+        List<RecordId> valueIds = newArrayList();
         for (int i = 0; i < count; i++) {
             if (type.tag() == PropertyType.BINARY) {
                 try {
@@ -1011,32 +685,33 @@ public class SegmentWriter {
         if (!type.isArray()) {
             return valueIds.iterator().next();
         } else if (count == 0) {
-            synchronized (this) {
-                RecordId propertyId = prepare(RecordType.LIST, 4);
-                writeInt(0);
-                return propertyId;
-            }
+            return writeRecord(new RecordWriter(LIST, 4) {
+                @Override
+                protected void write(RecordId id, SegmentBuilder builder) {
+                    builder.writeInt(0);
+                }
+            });
         } else {
-            RecordId listId = writeList(valueIds);
-            synchronized (this) {
-                RecordId propertyId = prepare(
-                        RecordType.LIST, 4, Collections.singleton(listId));
-                writeInt(count);
-                writeRecordId(listId);
-                return propertyId;
-            }
+            final RecordId listId = writeList(valueIds);
+            return writeRecord(new RecordWriter(LIST, 4, listId) {
+                @Override
+                public void write(RecordId id, SegmentBuilder builder) {
+                    builder.writeInt(count);
+                    builder.writeRecordId(listId);
+                }
+            });
         }
     }
 
-    public synchronized RecordId writeTemplate(Template template) {
+    public RecordId writeTemplate(Template template) {
         checkNotNull(template);
 
-        RecordId id = records.get(template);
+        RecordId id = getRecord(template);
         if (id != null) {
             return id; // shortcut if the same template was recently stored
         }
 
-        Collection<RecordId> ids = Lists.newArrayList();
+        Collection<RecordId> ids = newArrayList();
         int head = 0;
 
         RecordId primaryId = null;
@@ -1051,7 +726,7 @@ public class SegmentWriter {
         PropertyState mixinTypes = template.getMixinTypes();
         if (mixinTypes != null) {
             head |= 1 << 30;
-            mixinIds = Lists.newArrayList();
+            mixinIds = newArrayList();
             for (String mixin : mixinTypes.getValue(NAMES)) {
                 mixinIds.add(writeString(mixin));
             }
@@ -1087,7 +762,7 @@ public class SegmentWriter {
         }
 
         RecordId propNamesId = null;
-        if (segment.getSegmentVersion().onOrAfter(V_11)) {
+        if (version.onOrAfter(V_11)) {
             if (propertyNames.length > 0) {
                 propNamesId = writeList(Arrays.asList(propertyNames));
                 ids.add(propNamesId);
@@ -1098,60 +773,52 @@ public class SegmentWriter {
 
         checkState(propertyNames.length < (1 << 18));
         head |= propertyNames.length;
+        return writeTemplate(template, ids, propertyNames, propertyTypes, 
head, primaryId,
+                mixinIds, childNameId, propNamesId);
+    }
 
-        id = prepare(RecordType.TEMPLATE, 4 + propertyTypes.length, ids);
-        writeInt(head);
-        if (primaryId != null) {
-            writeRecordId(primaryId);
-        }
-        if (mixinIds != null) {
-            for (RecordId mixinId : mixinIds) {
-                writeRecordId(mixinId);
-            }
-        }
-        if (childNameId != null) {
-            writeRecordId(childNameId);
-        }
-        if (segment.getSegmentVersion().onOrAfter(V_11)) {
-            if (propNamesId != null) {
-                writeRecordId(propNamesId);
-            }
-        }
-        for (int i = 0; i < propertyNames.length; i++) {
-            if (!segment.getSegmentVersion().onOrAfter(V_11)) {
-                // V10 only
-                writeRecordId(propertyNames[i]);
+    public RecordId writeTemplate(Template template, final 
Collection<RecordId> ids,
+            final RecordId[] propertyNames, final byte[] propertyTypes, final 
int finalHead,
+            final RecordId finalPrimaryId, final List<RecordId> finalMixinIds, 
final RecordId
+            finalChildNameId, final RecordId finalPropNamesId) {
+        RecordId id = writeRecord(new RecordWriter(TEMPLATE, 4 + 
propertyTypes.length, ids) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                builder.writeInt(finalHead);
+                if (finalPrimaryId != null) {
+                    builder.writeRecordId(finalPrimaryId);
+                }
+                if (finalMixinIds != null) {
+                    for (RecordId mixinId : finalMixinIds) {
+                        builder.writeRecordId(mixinId);
+                    }
+                }
+                if (finalChildNameId != null) {
+                    builder.writeRecordId(finalChildNameId);
+                }
+                if (version.onOrAfter(V_11)) {
+                    if (finalPropNamesId != null) {
+                        builder.writeRecordId(finalPropNamesId);
+                    }
+                }
+                for (int i = 0; i < propertyNames.length; i++) {
+                    if (!version.onOrAfter(V_11)) {
+                        // V10 only
+                        builder.writeRecordId(propertyNames[i]);
+                    }
+                    builder.writeByte(propertyTypes[i]);
+                }
             }
-            buffer[position++] = propertyTypes[i];
-        }
-
-        records.put(template, id);
-
+        });
+        putRecord(template, id);
         return id;
     }
 
-    /**
-     * If the given node was compacted, return the compacted node, otherwise
-     * return the passed node. This is to avoid pointing to old nodes, if they
-     * have been compacted.
-     * 
-     * @param state the node
-     * @return the compacted node (if it was compacted)
-     */
-    private SegmentNodeState uncompact(SegmentNodeState state) {
-        RecordId id = tracker.getCompactionMap().get(state.getRecordId());
-        if (id != null) {
-            return new SegmentNodeState(id);
-        } else {
-            return state;
-        }
-    }
-
     public SegmentNodeState writeNode(NodeState state) {
         if (state instanceof SegmentNodeState) {
             SegmentNodeState sns = uncompact((SegmentNodeState) state);
             if (sns != state || store.containsSegment(
-                    sns.getRecordId().getSegmentId())) {
+                sns.getRecordId().getSegmentId())) {
                 return sns;
             }
         }
@@ -1165,7 +832,7 @@ public class SegmentWriter {
             if (base instanceof SegmentNodeState) {
                 SegmentNodeState sns = uncompact((SegmentNodeState) base);
                 if (sns != base || store.containsSegment(
-                        sns.getRecordId().getSegmentId())) {
+                    sns.getRecordId().getSegmentId())) {
                     before = sns;
                     beforeTemplate = before.getTemplate();
                 }
@@ -1180,7 +847,7 @@ public class SegmentWriter {
             templateId = writeTemplate(template);
         }
 
-        List<RecordId> ids = Lists.newArrayList();
+        final List<RecordId> ids = newArrayList();
         ids.add(templateId);
 
         String childName = template.getChildName();
@@ -1188,8 +855,8 @@ public class SegmentWriter {
             MapRecord base;
             final Map<String, RecordId> childNodes = Maps.newHashMap();
             if (before != null
-                    && before.getChildNodeCount(2) > 1
-                    && after.getChildNodeCount(2) > 1) {
+                && before.getChildNodeCount(2) > 1
+                && after.getChildNodeCount(2) > 1) {
                 base = before.getChildNodeMap();
                 after.compareAgainstBaseState(before, new 
DefaultNodeStateDiff() {
                     @Override
@@ -1199,7 +866,7 @@ public class SegmentWriter {
                     }
                     @Override
                     public boolean childNodeChanged(
-                            String name, NodeState before, NodeState after) {
+                        String name, NodeState before, NodeState after) {
                         childNodes.put(name, writeNode(after).getRecordId());
                         return true;
                     }
@@ -1213,8 +880,8 @@ public class SegmentWriter {
                 base = null;
                 for (ChildNodeEntry entry : state.getChildNodeEntries()) {
                     childNodes.put(
-                            entry.getName(),
-                            writeNode(entry.getNodeState()).getRecordId());
+                        entry.getName(),
+                        writeNode(entry.getNodeState()).getRecordId());
                 }
             }
             ids.add(writeMap(base, childNodes).getRecordId());
@@ -1222,16 +889,16 @@ public class SegmentWriter {
             
ids.add(writeNode(state.getChildNode(template.getChildName())).getRecordId());
         }
 
-        List<RecordId> pIds = Lists.newArrayList();
+        List<RecordId> pIds = newArrayList();
         for (PropertyTemplate pt : template.getPropertyTemplates()) {
             String name = pt.getName();
             PropertyState property = state.getProperty(name);
 
             if (property instanceof SegmentPropertyState
-                    && store.containsSegment(((SegmentPropertyState) 
property).getRecordId().getSegmentId())) {
+                && store.containsSegment(((SegmentPropertyState) 
property).getRecordId().getSegmentId())) {
                 pIds.add(((SegmentPropertyState) property).getRecordId());
             } else if (before == null
-                    || 
!store.containsSegment(before.getRecordId().getSegmentId())) {
+                || 
!store.containsSegment(before.getRecordId().getSegmentId())) {
                 pIds.add(writeProperty(property));
             } else {
                 // reuse previously stored property, if possible
@@ -1240,7 +907,7 @@ public class SegmentWriter {
                     pIds.add(writeProperty(property)); // new property
                 } else {
                     SegmentPropertyState bp = beforeTemplate.getProperty(
-                            before.getRecordId(), bt.getIndex());
+                        before.getRecordId(), bt.getIndex());
                     if (property.equals(bp)) {
                         pIds.add(bp.getRecordId()); // no changes
                     } else if (bp.isArray() && bp.getType() != BINARIES) {
@@ -1254,24 +921,142 @@ public class SegmentWriter {
         }
 
         if (!pIds.isEmpty()) {
-            if (segment.getSegmentVersion().onOrAfter(V_11)) {
+            if (version.onOrAfter(V_11)) {
                 ids.add(writeList(pIds));
             } else {
                 ids.addAll(pIds);
             }
         }
 
-        synchronized (this) {
-            RecordId recordId = prepare(RecordType.NODE, 0, ids);
-            for (RecordId id : ids) {
-                writeRecordId(id);
+        RecordId recordId = writeRecord(new RecordWriter(NODE, 0, ids) {
+            @Override
+            protected void write(RecordId id, SegmentBuilder builder) {
+                for (RecordId recordId : ids) {
+                    builder.writeRecordId(recordId);
+                }
+            }
+        });
+        return new SegmentNodeState(recordId);
+    }
+
+    /**
+     * If the given node was compacted, return the compacted node, otherwise
+     * return the passed node. This is to avoid pointing to old nodes, if they
+     * have been compacted.
+     *
+     * @param state the node
+     * @return the compacted node (if it was compacted)
+     */
+    private SegmentNodeState uncompact(SegmentNodeState state) {
+        RecordId id = tracker.getCompactionMap().get(state.getRecordId());
+        if (id != null) {
+            return new SegmentNodeState(id);
+        } else {
+            return state;
+        }
+    }
+
+    private RecordId writeRecord(RecordWriter recordWriter) {
+        SegmentBuilder builder = 
segmentBuilderPool.borrowBuilder(currentThread());
+        try {
+            RecordId id = builder.prepare(recordWriter.type, 
recordWriter.size, recordWriter.ids);
+            recordWriter.write(id, builder);
+            return id;
+        } finally {
+            segmentBuilderPool.returnBuilder(currentThread(), builder);
+        }
+    }
+
+    private abstract static class RecordWriter {
+        private final RecordType type;
+        private final int size;
+        private final Collection<RecordId> ids;
+
+        protected RecordWriter(RecordType type, int size, Collection<RecordId> 
ids) {
+            this.type = type;
+            this.size = size;
+            this.ids = ids;
+        }
+
+        protected RecordWriter(RecordType type, int size, RecordId id) {
+            this(type, size, singleton(id));
+        }
+
+        protected RecordWriter(RecordType type, int size) {
+            this(type, size, Collections.<RecordId>emptyList());
+        }
+
+        protected abstract void write(RecordId id, SegmentBuilder builder);
+    }
+
+    private RecordId getRecord(Object key) {
+        synchronized (records) {
+            return records.get(key);
+        }
+    }
+
+    private void putRecord(Object key, RecordId id) {
+        synchronized (records) {
+            records.put(key, id);
+        }
+    }
+
+    public void dropCache() {
+        synchronized (records) {
+            records.clear();
+        }
+    }
+
+    private class SegmentBuilderPool {
+        private final Set<SegmentBuilder> borrowed = newHashSet();
+        private final Map<Object, SegmentBuilder> builders = newHashMap();
+
+        public void flush() {
+            ArrayList<SegmentBuilder> toFlush = Lists.newArrayList();
+            synchronized (this) {
+                toFlush.addAll(builders.values());
+                builders.clear();
+                borrowed.clear();
+            }
+            // Call flush from outside a synchronized context to avoid
+            // deadlocks of that method calling SegmentStore.writeSegment
+            for (SegmentBuilder builder : toFlush) {
+                builder.flush();
+            }
+        }
+
+        public synchronized SegmentBuilder borrowBuilder(Object key) {
+            SegmentBuilder builder = builders.remove(key);
+            if (builder == null) {
+                builder = new SegmentBuilder(store, tracker, version, wid + 
"." + (key.hashCode() & 0xffff));
+            }
+            borrowed.add(builder);
+            return builder;
+        }
+
+        public void returnBuilder(Object key, SegmentBuilder builder) {
+            if (!tryReturn(key, builder)) {
+                // Delayed flush this builder as it was borrowed while flush() 
was called.
+                builder.flush();
+            }
+        }
+
+        private synchronized boolean tryReturn(Object key, SegmentBuilder 
builder) {
+            if (borrowed.remove(builder)) {
+                builders.put(key, builder);
+                return true;
+            } else {
+                return false;
             }
-            return new SegmentNodeState(recordId);
         }
     }
 
-    public SegmentTracker getTracker() {
-        return tracker;
+    private static int align(int value) {
+        return align(value, 1 << Segment.RECORD_ALIGN_BITS);
+    }
+
+    private static int align(int value, int boundary) {
+        return (value + boundary - 1) & ~(boundary - 1);
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java?rev=1716466&r1=1716465&r2=1716466&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java
 Wed Nov 25 16:28:33 2015
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("5.3.0")
+@Version("6.0.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.plugins.segment;
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdFactoryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdFactoryTest.java?rev=1716466&r1=1716465&r2=1716466&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdFactoryTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdFactoryTest.java
 Wed Nov 25 16:28:33 2015
@@ -90,7 +90,7 @@ public class SegmentIdFactoryTest {
     @Test(expected = IllegalStateException.class)
     public void dataAIOOBE() {
         SegmentId id = factory.newDataSegmentId();
-        byte[] buffer = SegmentWriter.createNewBuffer(SegmentVersion.V_11);
+        byte[] buffer = SegmentBuilder.createNewBuffer(SegmentVersion.V_11);
         ByteBuffer data = ByteBuffer.allocate(Segment.MAX_SEGMENT_SIZE);
         data.put(buffer);
         data.rewind();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java?rev=1716466&r1=1716465&r2=1716466&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
 Wed Nov 25 16:28:33 2015
@@ -40,29 +40,29 @@ public class SegmentSizeTest {
     @Test
     public void testNodeSize() {
         NodeBuilder builder = EMPTY_NODE.builder();
-        assertEquals(96, getSize(builder));
+        assertEquals(112, getSize(builder));
         assertEquals(4, getAmortizedSize(builder));
 
         builder = EMPTY_NODE.builder();
         builder.setProperty("foo", "bar");
-        assertEquals(96, getSize(builder));
+        assertEquals(112, getSize(builder));
         assertEquals(8, getAmortizedSize(builder));
 
         builder = EMPTY_NODE.builder();
         builder.setProperty("foo", "bar");
         builder.setProperty("baz", 123);
-        assertEquals(128, getSize(builder));
+        assertEquals(144, getSize(builder));
         assertEquals(16, getAmortizedSize(builder));
 
         builder = EMPTY_NODE.builder();
         builder.child("foo");
-        assertEquals(112, getSize(builder));
+        assertEquals(128, getSize(builder));
         assertEquals(12, getAmortizedSize(builder));
 
         builder = EMPTY_NODE.builder();
         builder.child("foo");
         builder.child("bar");
-        assertEquals(144, getSize(builder));
+        assertEquals(160, getSize(builder));
         assertEquals(40, getAmortizedSize(builder));
     }
 
@@ -118,7 +118,7 @@ public class SegmentSizeTest {
     public void testAccessControlNodes() {
         NodeBuilder builder = EMPTY_NODE.builder();
         builder.setProperty("jcr:primaryType", "rep:ACL", Type.NAME);
-        assertEquals(96, getSize(builder));
+        assertEquals(112, getSize(builder));
         assertEquals(4, getAmortizedSize(builder));
 
         NodeBuilder deny = builder.child("deny");
@@ -126,7 +126,7 @@ public class SegmentSizeTest {
         deny.setProperty("rep:principalName", "everyone");
         deny.setProperty(PropertyStates.createProperty(
                 "rep:privileges", ImmutableList.of("jcr:read"), Type.NAMES));
-        assertEquals(224, getSize(builder));
+        assertEquals(240, getSize(builder));
         assertEquals(32, getAmortizedSize(builder));
 
         NodeBuilder allow = builder.child("allow");
@@ -143,7 +143,7 @@ public class SegmentSizeTest {
         deny0.setProperty("rep:glob", "*/activities/*");
         builder.setProperty(PropertyStates.createProperty(
                 "rep:privileges", ImmutableList.of("jcr:read"), Type.NAMES));
-        assertEquals(464, getSize(builder));
+        assertEquals(480, getSize(builder));
         assertEquals(124, getAmortizedSize(builder));
 
         NodeBuilder allow0 = builder.child("allow0");
@@ -151,7 +151,7 @@ public class SegmentSizeTest {
         allow0.setProperty("rep:principalName", "user-administrators");
         allow0.setProperty(PropertyStates.createProperty(
                 "rep:privileges", ImmutableList.of("jcr:all"), Type.NAMES));
-        assertEquals(528, getSize(builder));
+        assertEquals(544, getSize(builder));
         assertEquals(160, getAmortizedSize(builder));
     }
 


Reply via email to