smiklosovic commented on code in PR #2058: URL: https://github.com/apache/cassandra/pull/2058#discussion_r1063646283
########## src/java/org/apache/cassandra/db/streaming/FileStreamMetricsListener.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.StreamingMetrics; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Used to keep track of the bytes streamed for a given file as bytes of the file are transferred and report them to StreamingMetrics. + */ +public class FileStreamMetricsListener +{ + public static final FileStreamMetricsListener NO_OP = new FileStreamMetricsListener(0L, FBUtilities.getLocalAddressAndPort(), ProgressInfo.Direction.IN){ Review Comment: Should not be the curly bracket at the new line too? There is also a space missing `){` vs `) {`. It is also a little bit counter-intuitive to have a direction `IN` used for NO_OP listener. Why not `OUT`? I think it would be preferrable to define a `Direction` called `NO_OP` or `NA` (Not Available) for this purpose. However, by calling this constructor, we are running through the logic which interacts with `StreamingMetrics` etc. I do no think that is necessary at all. We could create an empty constructor for `FileStreamMetricsListener` and initialise class properties with nulls for counters and 0 for totalSize in it. That way we avoid messing with `Direction` too. ########## src/java/org/apache/cassandra/db/streaming/FileStreamMetricsListener.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.StreamingMetrics; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Used to keep track of the bytes streamed for a given file as bytes of the file are transferred and report them to StreamingMetrics. + */ +public class FileStreamMetricsListener +{ + public static final FileStreamMetricsListener NO_OP = new FileStreamMetricsListener(0L, FBUtilities.getLocalAddressAndPort(), ProgressInfo.Direction.IN){ + @Override + public void onStreamingBytesTransferred(long cumulativeBytesTransferred) + { + //no-op + } + }; + + private final long totalSize; + private final Counter totalOutgoingBytesStreamed; + private final Counter peerOutgoingBytesStreamed; + private long lastSeenBytes = 0L; + + public FileStreamMetricsListener(long totalSize, InetAddressAndPort peer, ProgressInfo.Direction direction) + { + this.totalSize = totalSize; + if (direction == ProgressInfo.Direction.OUT) + { + this.totalOutgoingBytesStreamed = StreamingMetrics.totalOutgoingBytes; + this.peerOutgoingBytesStreamed = StreamingMetrics.get(peer).outgoingBytes; + } + else + { + this.totalOutgoingBytesStreamed = StreamingMetrics.totalIncomingBytes; + this.peerOutgoingBytesStreamed = StreamingMetrics.get(peer).incomingBytes; + } + Review Comment: nit: redundant empty line ########## src/java/org/apache/cassandra/db/streaming/FileStreamMetricsListener.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.StreamingMetrics; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Used to keep track of the bytes streamed for a given file as bytes of the file are transferred and report them to StreamingMetrics. + */ +public class FileStreamMetricsListener +{ + public static final FileStreamMetricsListener NO_OP = new FileStreamMetricsListener(0L, FBUtilities.getLocalAddressAndPort(), ProgressInfo.Direction.IN){ + @Override + public void onStreamingBytesTransferred(long cumulativeBytesTransferred) + { + //no-op + } + }; + + private final long totalSize; + private final Counter totalOutgoingBytesStreamed; + private final Counter peerOutgoingBytesStreamed; + private long lastSeenBytes = 0L; + + public FileStreamMetricsListener(long totalSize, InetAddressAndPort peer, ProgressInfo.Direction direction) Review Comment: non null annotation for `direction` maybe as it was done previously? If we introduce `Direction.NA` then `else` would need to be changed to `else if` for `IN`. ########## src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java: ########## @@ -172,10 +174,11 @@ public void write(StreamSession session, StreamingDataOutputPlus out, int versio out.flush(); CassandraStreamWriter writer = header.isCompressed() ? - new CassandraCompressedStreamWriter(sstable, header, session) : - new CassandraStreamWriter(sstable, header, session); + new CassandraCompressedStreamWriter(sstable, header, session, fileStreamMetricsListener) : + new CassandraStreamWriter(sstable, header, session, fileStreamMetricsListener); writer.write(out); } + fileStreamMetricsListener.onStreamSuccessful(); Review Comment: what if `onStreamSuccessful` returned boolean so it would be called `isStreamSuccessful` and if it is not it would throw an exception? This method already throws `IOException` so it seems to be quite logical to me that if streaming was not successful we would throw as well. Right now we are doing `assert` in it which can throw `AssertionError` but that is not subclass of `IOException`. ########## src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java: ########## @@ -57,7 +57,6 @@ public CassandraCompressedStreamReader(StreamMessageHeader header, CassandraStre public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable { long totalSize = totalSize(); - Review Comment: probably leave it as it was? ########## src/java/org/apache/cassandra/streaming/StreamSession.java: ########## @@ -200,6 +203,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber private final TimeUUID pendingRepair; private final PreviewKind previewKind; + private final Map<String, Long> lastSeenIncomingBytesStreamed; Review Comment: this does not seem to be necessary anymore with all other changes in this class? ########## src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java: ########## @@ -95,7 +97,6 @@ public SSTableMultiWriter read(DataInputPlus in) throws Throwable ComponentManifest manifest = header.componentManifest; long totalSize = manifest.totalSize(); - logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", Review Comment: probably leave it as it was? ########## src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java: ########## @@ -109,7 +111,6 @@ public CassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader s public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable { long totalSize = totalSize(); - Review Comment: just leave it as it was? ########## src/java/org/apache/cassandra/streaming/StreamSession.java: ########## @@ -36,13 +36,16 @@ import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nonnull; Review Comment: check unused imports maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

