This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1411a4  Fix streaming stats during entire sstable streaming
b1411a4 is described below

commit b1411a43180e0085ae4741f4da567a08b5a28f17
Author: Stefan Miklosovic <stefan.mikloso...@instaclustr.com>
AuthorDate: Mon Apr 6 10:11:42 2020 +0200

    Fix streaming stats during entire sstable streaming
    
    Patch by Stefan Miklosovic; Reviewed by Dinesh Joshi for CASSANDRA-15694
---
 .../db/streaming/CassandraIncomingFile.java        |  10 +
 .../db/streaming/CassandraOutgoingFile.java        |  21 +-
 .../apache/cassandra/streaming/IncomingStream.java |   1 +
 .../apache/cassandra/streaming/OutgoingStream.java |   1 +
 .../cassandra/streaming/StreamReceiveTask.java     |   2 +-
 .../cassandra/streaming/StreamTransferTask.java    |  16 +-
 ...ntireSSTableStreamingCorrectFilesCountTest.java | 236 +++++++++++++++++++++
 .../streaming/StreamTransferTaskTest.java          |   4 +-
 8 files changed, 278 insertions(+), 13 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index c65ca62..807d935 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -46,6 +46,7 @@ public class CassandraIncomingFile implements IncomingStream
 
     private volatile SSTableMultiWriter sstable;
     private volatile long size = -1;
+    private volatile int numFiles = 1;
 
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraIncomingFile.class);
 
@@ -64,7 +65,10 @@ public class CassandraIncomingFile implements IncomingStream
 
         IStreamReader reader;
         if (streamHeader.isEntireSSTable)
+        {
             reader = new CassandraEntireSSTableStreamReader(header, 
streamHeader, session);
+            numFiles = streamHeader.componentManifest.components().size();
+        }
         else if (streamHeader.isCompressed())
             reader = new CassandraCompressedStreamReader(header, streamHeader, 
session);
         else
@@ -88,6 +92,12 @@ public class CassandraIncomingFile implements IncomingStream
     }
 
     @Override
+    public int getNumFiles()
+    {
+        return numFiles;
+    }
+
+    @Override
     public TableId getTableId()
     {
         Preconditions.checkState(sstable != null, "Stream hasn't been read 
yet");
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 237c0af..0917fba 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -59,7 +59,7 @@ public class CassandraOutgoingFile implements OutgoingStream
     private final boolean keepSSTableLevel;
     private final ComponentManifest manifest;
 
-    private final boolean shouldStreamEntireSStable;
+    private final boolean shouldStreamEntireSSTable;
 
     public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref,
                                  List<SSTableReader.PartitionPositionBounds> 
sections, List<Range<Token>> normalizedRanges,
@@ -72,7 +72,7 @@ public class CassandraOutgoingFile implements OutgoingStream
         this.sections = sections;
         this.filename = ref.get().getFilename();
         this.manifest = getComponentManifest(ref.get());
-        this.shouldStreamEntireSStable = shouldStreamEntireSSTable();
+        this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
 
         SSTableReader sstable = ref.get();
         keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation 
== StreamOperation.REBUILD;
@@ -85,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream
                                  .withSections(sections)
                                  .withCompressionMetadata(sstable.compression 
? sstable.getCompressionMetadata() : null)
                                  
.withSerializationHeader(sstable.header.toComponent())
-                                 .isEntireSSTable(shouldStreamEntireSStable)
+                                 .isEntireSSTable(shouldStreamEntireSSTable)
                                  .withComponentManifest(manifest)
                                  .withFirstKey(sstable.first)
                                  .withTableId(sstable.metadata().id)
@@ -137,6 +137,12 @@ public class CassandraOutgoingFile implements 
OutgoingStream
     }
 
     @Override
+    public int getNumFiles()
+    {
+        return shouldStreamEntireSSTable ? getManifestSize() : 1;
+    }
+
+    @Override
     public long getRepairedAt()
     {
         return ref.get().getRepairedAt();
@@ -148,6 +154,11 @@ public class CassandraOutgoingFile implements 
OutgoingStream
         return ref.get().getPendingRepair();
     }
 
+    public int getManifestSize()
+    {
+        return manifest.components().size();
+    }
+
     @Override
     public void write(StreamSession session, DataOutputStreamPlus out, int 
version) throws IOException
     {
@@ -155,7 +166,7 @@ public class CassandraOutgoingFile implements OutgoingStream
         CassandraStreamHeader.serializer.serialize(header, out, version);
         out.flush();
 
-        if (shouldStreamEntireSStable && out instanceof 
AsyncStreamingOutputPlus)
+        if (shouldStreamEntireSSTable && out instanceof 
AsyncStreamingOutputPlus)
         {
             CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, manifest);
             writer.write((AsyncStreamingOutputPlus) out);
@@ -171,7 +182,7 @@ public class CassandraOutgoingFile implements OutgoingStream
     }
 
     @VisibleForTesting
-    public boolean shouldStreamEntireSSTable()
+    public boolean computeShouldStreamEntireSSTables()
     {
         // don't stream if full sstable transfers are disabled or legacy 
counter shards are present
         if (!DatabaseDescriptor.streamEntireSSTables() || 
ref.get().getSSTableMetadata().hasLegacyCounterShards)
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java 
b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 18bebf5..55fbd4f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -41,5 +41,6 @@ public interface IncomingStream
 
     String getName();
     long getSize();
+    int getNumFiles();
     TableId getTableId();
 }
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java 
b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
index e71b985..4a58cae 100644
--- a/src/java/org/apache/cassandra/streaming/OutgoingStream.java
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -49,4 +49,5 @@ public interface OutgoingStream
     String getName();
     long getSize();
     TableId getTableId();
+    int getNumFiles();
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 87d6ce0..25977a5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -82,7 +82,7 @@ public class StreamReceiveTask extends StreamTask
             return;
         }
 
-        remoteStreamsReceived++;
+        remoteStreamsReceived += stream.getNumFiles();
         bytesReceived += stream.getSize();
         Preconditions.checkArgument(tableId.equals(stream.getTableId()));
         logger.debug("received {} of {} total files {} of total bytes {}", 
remoteStreamsReceived, totalStreams,
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index ba05acd..0f7a834 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -17,16 +17,20 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +56,8 @@ public class StreamTransferTask extends StreamTask
     protected final Map<Integer, OutgoingStreamMessage> streams = new 
HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
-    private long totalSize;
+    private long totalSize = 0;
+    private int totalFiles = 0;
 
     public StreamTransferTask(StreamSession session, TableId tableId)
     {
@@ -66,6 +71,7 @@ public class StreamTransferTask extends StreamTask
         message = StreamHook.instance.reportOutgoingStream(session, stream, 
message);
         streams.put(message.header.sequenceNumber, message);
         totalSize += message.stream.getSize();
+        totalFiles += message.stream.getNumFiles();
     }
 
     /**
@@ -125,7 +131,7 @@ public class StreamTransferTask extends StreamTask
 
     public synchronized int getTotalNumberOfFiles()
     {
-        return streams.size();
+        return totalFiles;
     }
 
     public long getTotalSize()
diff --git 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
new file mode 100644
index 0000000..a57fcbc
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.AsyncStreamingOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SharedDefaultFileRegion;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class EntireSSTableStreamingCorrectFilesCountTest
+{
+    public static final String KEYSPACE = 
"EntireSSTableStreamingCorrectFilesCountTest";
+    public static final String CF_STANDARD = "Standard1";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+    private static RangesAtEndpoint rangesAtEndpoint;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARD)
+                                                // LeveledCompactionStrategy 
is important here,
+                                                // streaming of entire 
SSTables works currently only with this strategy
+                                                
.compaction(CompactionParams.lcs(Collections.emptyMap()))
+                                                
.partitioner(ByteOrderedPartitioner.instance));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore(CF_STANDARD);
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+
+        Token start = 
ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(0));
+        Token end = 
ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(100));
+
+        rangesAtEndpoint = 
RangesAtEndpoint.toDummyList(Collections.singleton(new Range<>(start, end)));
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        FileCountingStreamEventHandler streamEventHandler = new 
FileCountingStreamEventHandler();
+        StreamSession session = 
setupStreamingSessionForTest(streamEventHandler);
+        Collection<OutgoingStream> outgoingStreams = 
store.getStreamManager().createOutgoingStreams(session,
+                                                                               
                     rangesAtEndpoint,
+                                                                               
                     NO_PENDING_REPAIR,
+                                                                               
                     PreviewKind.NONE);
+
+        session.addTransferStreams(outgoingStreams);
+        DataOutputStreamPlus out = constructDataOutputStream();
+
+        for (OutgoingStream outgoingStream : outgoingStreams)
+            outgoingStream.write(session, out, MessagingService.VERSION_40);
+
+        int totalNumberOfFiles = 
session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
+
+        
assertEquals(CassandraOutgoingFile.getComponentManifest(sstable).components().size(),
 totalNumberOfFiles);
+        assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
+    }
+
+    private DataOutputStreamPlus constructDataOutputStream()
+    {
+        // This is needed as Netty releases the ByteBuffers as soon as the 
channel is flushed
+        ByteBuf serializedFile = Unpooled.buffer(8192);
+        EmbeddedChannel channel = createMockNettyChannel(serializedFile);
+        return new AsyncStreamingOutputPlus(channel)
+        {
+            public void flush() throws IOException
+            {
+                // NO-OP
+            }
+        };
+    }
+
+    private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile)
+    {
+        WritableByteChannel wbc = new WritableByteChannel()
+        {
+            private boolean isOpen = true;
+
+            public int write(ByteBuffer src)
+            {
+                int size = src.limit();
+                serializedFile.writeBytes(src);
+                return size;
+            }
+
+            public boolean isOpen()
+            {
+                return isOpen;
+            }
+
+            public void close()
+            {
+                isOpen = false;
+            }
+        };
+
+        return new EmbeddedChannel(new ChannelOutboundHandlerAdapter()
+        {
+            @Override
+            public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception
+            {
+                ((SharedDefaultFileRegion) msg).transferTo(wbc, 0);
+                super.write(ctx, msg, promise);
+            }
+        });
+    }
+
+
+    private StreamSession setupStreamingSessionForTest(StreamEventHandler 
streamEventHandler)
+    {
+        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP,
+                                                                    1,
+                                                                    new 
DefaultConnectionFactory(),
+                                                                    false,
+                                                                    null,
+                                                                    
PreviewKind.NONE);
+
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
+                                                            
StreamOperation.BOOTSTRAP,
+                                                            
Collections.singleton(streamEventHandler),
+                                                            streamCoordinator);
+
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        streamCoordinator.addSessionInfo(new SessionInfo(peer,
+                                                         0,
+                                                         peer,
+                                                         
Collections.emptyList(),
+                                                         
Collections.emptyList(),
+                                                         
StreamSession.State.INITIALIZED));
+
+        StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+        session.init(future);
+
+        return session;
+    }
+
+    private static final class FileCountingStreamEventHandler implements 
StreamEventHandler
+    {
+        final Collection<String> fileNames = new ArrayList<>();
+
+        public void handleStreamEvent(StreamEvent event)
+        {
+            if (event.eventType == StreamEvent.Type.FILE_PROGRESS && event 
instanceof StreamEvent.ProgressEvent)
+            {
+                StreamEvent.ProgressEvent progressEvent = 
((StreamEvent.ProgressEvent) event);
+                fileNames.add(progressEvent.progress.fileName);
+            }
+        }
+
+        public void onSuccess(@Nullable StreamState streamState)
+        {
+            assert streamState != null;
+            assertFalse(streamState.hasFailedSession());
+        }
+
+        public void onFailure(Throwable throwable)
+        {
+            fail();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 2361125..2f4feff 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -95,7 +95,7 @@ public class StreamTransferTaskTest
             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
             task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), 
sstable.getPositionsForRanges(ranges), ranges, 1));
         }
-        assertEquals(2, task.getTotalNumberOfFiles());
+        assertEquals(14, task.getTotalNumberOfFiles());
 
         // if file sending completes before timeout then the task should be 
canceled.
         Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
@@ -147,7 +147,7 @@ public class StreamTransferTaskTest
             refs.add(ref);
             task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, 
sstable.getPositionsForRanges(ranges), ranges, 1));
         }
-        assertEquals(2, task.getTotalNumberOfFiles());
+        assertEquals(14, task.getTotalNumberOfFiles());
 
         //add task to stream session, so it is aborted when stream session 
fails
         session.transfers.put(TableId.generate(), task);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to