iamaleksey commented on code in PR #2256:
URL: https://github.com/apache/cassandra/pull/2256#discussion_r1183601806


##########
src/java/org/apache/cassandra/journal/ActiveSegment.java:
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+final class ActiveSegment<K> extends Segment<K>
+{
+    final FileChannel channel;
+
+    // OpOrder used to order appends wrt flush
+    private final OpOrder appendOrder = new OpOrder();
+
+    // position in the buffer we are allocating from
+    private final AtomicInteger allocatePosition = new AtomicInteger(0);
+
+    /*
+     * Everything before this offset has been written and flushed.
+     */
+    private volatile int lastFlushedOffset = 0;
+
+    /*
+     * End position of the buffer; initially set to its capacity and
+     * updated to point to the last written position as the segment is being 
closed
+     * no need to be volatile as writes are protected by appendOrder barrier.
+     */
+    private int endOfBuffer;
+
+    // a signal that writers can wait on to be notified of a completed flush 
in BATCH and GROUP FlushMode
+    private final WaitQueue flushComplete = WaitQueue.newWaitQueue();
+
+    private final Ref<Segment<K>> selfRef;
+
+    private ActiveSegment(
+        Descriptor descriptor, Params params, SyncedOffsets syncedOffsets, 
Index<K> index, Metadata metadata, KeySupport<K> keySupport)
+    {
+        super(descriptor, syncedOffsets, index, metadata, keySupport);
+
+        try
+        {
+            channel = FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
+            buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
params.segmentSize());
+            endOfBuffer = buffer.capacity();
+            selfRef = new Ref<>(this, new Tidier(descriptor, channel, buffer, 
syncedOffsets));
+        }
+        catch (IOException e)
+        {
+            throw new JournalWriteError(descriptor, file, e);
+        }
+    }
+
+    static <K> ActiveSegment<K> create(Descriptor descriptor, Params params, 
KeySupport<K> keySupport)
+    {
+        SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true);
+        Index<K> index = InMemoryIndex.create(keySupport);
+        Metadata metadata = Metadata.create();
+        return new ActiveSegment<>(descriptor, params, syncedOffsets, index, 
metadata, keySupport);
+    }
+
+    /**
+     * Read the entry and specified offset into the entry holder.
+     * Expects the caller to acquire the ref to the segment and the record to 
exist.
+     */
+    @Override
+    boolean read(int offset, EntrySerializer.EntryHolder<K> into)
+    {
+        ByteBuffer duplicate = 
buffer.duplicate().position(offset).limit(buffer.capacity());
+        try
+        {
+            EntrySerializer.read(into, keySupport, duplicate, 
descriptor.userVersion);
+        }
+        catch (IOException e)
+        {
+            throw new JournalReadError(descriptor, file, e);
+        }
+        return true;
+    }
+
+    /**
+     * Stop writing to this file, flush and close it. Does nothing if the file 
is already closed.
+     */
+    @Override
+    public synchronized void close()
+    {
+        close(true);
+    }
+
+    /**
+     * @return true if the closed segment was definitely empty, false otherwise
+     */
+    private synchronized boolean close(boolean persistComponents)
+    {
+        boolean isEmpty = discardUnusedTail();
+        if (!isEmpty)
+        {
+            flush();
+            if (persistComponents) persistComponents();
+        }
+        release();
+        return isEmpty;
+    }
+
+    /**
+     * Close and discard a pre-allocated, available segment, that's never been 
exposed
+     */
+    void closeAndDiscard()
+    {
+        boolean isEmpty = close(false);
+        if (!isEmpty) throw new IllegalStateException();
+        discard();
+    }
+
+    void closeAndIfEmptyDiscard()
+    {
+        boolean isEmpty = close(true);
+        if (isEmpty) discard();
+    }
+
+    void persistComponents()
+    {
+        index.persist(descriptor);
+        metadata.persist(descriptor);

Review Comment:
   Done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to