maedhroz commented on code in PR #2299:
URL: https://github.com/apache/cassandra/pull/2299#discussion_r1187655369


##########
test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java:
##########
@@ -0,0 +1,850 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+public class ListenableFileSystem extends ForwardingFileSystem
+{
+    @FunctionalInterface
+    public interface PathFilter
+    {
+        boolean accept(Path entry) throws IOException;
+    }
+
+    public interface Listener
+    {
+    }
+
+    public interface OnPreOpen extends Listener
+    {
+        void preOpen(Path path, Set<? extends OpenOption> options, 
FileAttribute<?>[] attrs) throws IOException;
+    }
+
+    public interface OnPostOpen extends Listener
+    {
+        void postOpen(Path path, Set<? extends OpenOption> options, 
FileAttribute<?>[] attrs, FileChannel channel) throws IOException;
+    }
+
+    public interface OnPreRead extends Listener
+    {
+        void preRead(Path path, FileChannel channel, long position, ByteBuffer 
dst) throws IOException;
+    }
+
+    public interface OnPostRead extends Listener
+    {
+        void postRead(Path path, FileChannel channel, long position, 
ByteBuffer dst, int read) throws IOException;
+    }
+
+    public interface OnPreTransferTo extends Listener
+    {
+        void preTransferTo(Path path, FileChannel channel, long position, long 
count, WritableByteChannel target) throws IOException;
+    }
+
+    public interface OnPostTransferTo extends Listener
+    {
+        void postTransferTo(Path path, FileChannel channel, long position, 
long count, WritableByteChannel target, long transfered) throws IOException;
+    }
+
+    public interface OnPreTransferFrom extends Listener
+    {
+        void preTransferFrom(Path path, FileChannel channel, 
ReadableByteChannel src, long position, long count) throws IOException;
+    }
+
+    public interface OnPostTransferFrom extends Listener
+    {
+        void postTransferFrom(Path path, FileChannel channel, 
ReadableByteChannel src, long position, long count, long transfered) throws 
IOException;
+    }
+
+    public interface OnPreWrite extends Listener
+    {
+        void preWrite(Path path, FileChannel channel, long position, 
ByteBuffer src) throws IOException;
+    }
+
+    public interface OnPostWrite extends Listener
+    {
+        void postWrite(Path path, FileChannel channel, long position, 
ByteBuffer src, int wrote) throws IOException;
+    }
+
+    public interface OnPrePosition extends Listener
+    {
+        void prePosition(Path path, FileChannel channel, long position, long 
newPosition) throws IOException;
+    }
+
+    public interface OnPostPosition extends Listener
+    {
+        void postPosition(Path path, FileChannel channel, long position, long 
newPosition) throws IOException;
+    }
+
+    public interface OnPreTruncate extends Listener
+    {
+        void preTruncate(Path path, FileChannel channel, long size, long 
targetSize) throws IOException;
+    }
+
+    public interface OnPostTruncate extends Listener
+    {
+        void postTruncate(Path path, FileChannel channel, long size, long 
targetSize, long newSize) throws IOException;
+    }
+
+    public interface OnPreForce extends Listener
+    {
+        void preForce(Path path, FileChannel channel, boolean metaData) throws 
IOException;
+    }
+
+    public interface OnPostForce extends Listener
+    {
+        void postForce(Path path, FileChannel channel, boolean metaData) 
throws IOException;
+    }
+
+    public interface Unsubscribable extends AutoCloseable
+    {
+        @Override
+        void close();
+    }
+
+    private final List<OnPreOpen> onPreOpen = new CopyOnWriteArrayList<>();
+    private final List<OnPostOpen> onPostOpen = new CopyOnWriteArrayList<>();
+    private final List<OnPreTransferTo> onPreTransferTo = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTransferTo> onPostTransferTo = new 
CopyOnWriteArrayList<>();
+    private final List<OnPreRead> onPreRead = new CopyOnWriteArrayList<>();
+    private final List<OnPostRead> onPostRead = new CopyOnWriteArrayList<>();
+    private final List<OnPreWrite> onPreWrite = new CopyOnWriteArrayList<>();
+    private final List<OnPostWrite> onPostWrite = new CopyOnWriteArrayList<>();
+    private final List<OnPreTransferFrom> onPreTransferFrom = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTransferFrom> onPostTransferFrom = new 
CopyOnWriteArrayList<>();
+
+    private final List<OnPreForce> onPreForce = new CopyOnWriteArrayList<>();
+    private final List<OnPostForce> onPostForce = new CopyOnWriteArrayList<>();
+    private final List<OnPreTruncate> onPreTruncate = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTruncate> onPostTruncate = new 
CopyOnWriteArrayList<>();
+    private final List<OnPrePosition> onPrePosition = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostPosition> onPostPosition = new 
CopyOnWriteArrayList<>();
+    private final List<List<? extends Listener>> lists = 
Arrays.asList(onPreOpen, onPostOpen,
+                                                                       
onPreRead, onPostRead,
+                                                                       
onPreTransferTo, onPostTransferTo,
+                                                                       
onPreWrite, onPostWrite,
+                                                                       
onPreTransferFrom, onPostTransferFrom,
+                                                                       
onPreForce, onPostForce,
+                                                                       
onPreTruncate, onPostTruncate,
+                                                                       
onPrePosition, onPostPosition);
+    private final ListenableFileSystemProvider provider;
+
+    public ListenableFileSystem(FileSystem delegate)
+    {
+        super(delegate);
+        this.provider = new ListenableFileSystemProvider(super.provider());
+    }
+
+    public Unsubscribable listen(Listener listener)
+    {
+        boolean match = false;
+        if (listener instanceof OnPreOpen)
+        {
+            onPreOpen.add((OnPreOpen) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostOpen)
+        {
+            onPostOpen.add((OnPostOpen) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreRead)
+        {
+            onPreRead.add((OnPreRead) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostRead)
+        {
+            onPostRead.add((OnPostRead) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreTransferTo)
+        {
+            onPreTransferTo.add((OnPreTransferTo) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostTransferTo)
+        {
+            onPostTransferTo.add((OnPostTransferTo) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreWrite)
+        {
+            onPreWrite.add((OnPreWrite) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostWrite)
+        {
+            onPostWrite.add((OnPostWrite) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreTransferFrom)
+        {
+            onPreTransferFrom.add((OnPreTransferFrom) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostTransferFrom)
+        {
+            onPostTransferFrom.add((OnPostTransferFrom) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreForce)
+        {
+            onPreForce.add((OnPreForce) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostForce)
+        {
+            onPostForce.add((OnPostForce) listener);
+            match = true;
+        }
+        if (listener instanceof OnPreTruncate)
+        {
+            onPreTruncate.add((OnPreTruncate) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostTruncate)
+        {
+            onPostTruncate.add((OnPostTruncate) listener);
+            match = true;
+        }
+        if (listener instanceof OnPrePosition)
+        {
+            onPrePosition.add((OnPrePosition) listener);
+            match = true;
+        }
+        if (listener instanceof OnPostPosition)
+        {
+            onPostPosition.add((OnPostPosition) listener);
+            match = true;
+        }
+        if (!match)

Review Comment:
   nit: What if, instead of tracking a boolean here, we just track which list 
(ex. `List<? extends Listener> match`) we add to, and then have the closure 
remove specifically from that list, instead of trying to remove it from 
everything? (i.e. `match` is null when there's no match above, an we throw and 
do nothing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to