Author: thomasm
Date: Thu Nov 19 13:35:59 2015
New Revision: 1715177

URL: http://svn.apache.org/viewvc?rev=1715177&view=rev
Log:
OAK-2843 Broadcasting cache

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
 Thu Nov 19 13:35:59 2015
@@ -44,7 +44,12 @@ public class MultiGenerationMap<K, V> im
     
     @Override
     public V put(K key, V value) {
-        return write.put(key, value);
+        CacheMap<K, V> m = write;
+        if (m == null) {
+            // closed concurrently
+            return null;
+        }
+        return m.put(key, value);
     }
 
     @SuppressWarnings("unchecked")

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 Thu Nov 19 13:35:59 2015
@@ -113,10 +113,13 @@ class NodeCache<K, V> implements Cache<K
                 }
             });
         }
-        if (value == null) {
-            map.remove(key);
-        } else {
-            map.put(key, value);
+        MultiGenerationMap<K, V> m = map;
+        if (m != null) {
+            if (value == null) {
+                m.remove(key);
+            } else {
+                m.put(key, value);
+            }
         }
     }
     

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 Thu Nov 19 13:35:59 2015
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.h2.mvstore.FileStore;
@@ -197,6 +198,9 @@ public class PersistentCache implements
         } else if (broadcast.startsWith("udp:")) {
             String config = broadcast.substring("udp:".length(), 
broadcast.length());
             broadcaster = new UDPBroadcaster(config);
+        } else if (broadcast.startsWith("tcp:")) {
+            String config = broadcast.substring("tcp:".length(), 
broadcast.length());
+            broadcaster = new TCPBroadcaster(config);
         } else {
             throw new IllegalArgumentException("Unknown broadcaster type " + 
broadcast);
         }

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java?rev=1715177&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
 Thu Nov 19 13:35:59 2015
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A broadcast mechanism that uses TCP. It is mainly used for testing.
+ */
+public class TCPBroadcaster implements Broadcaster {
+
+    static final Logger LOG = LoggerFactory.getLogger(TCPBroadcaster.class);
+    private static final int TIMEOUT = 100;
+    private static final int MAX_BUFFER_SIZE = 64;
+    private static final AtomicInteger NEXT_ID = new AtomicInteger();
+
+    private final byte[] key;
+    private final int id = NEXT_ID.incrementAndGet();
+    private final CopyOnWriteArrayList<Listener> listeners = new 
CopyOnWriteArrayList<Listener>();
+    private final ServerSocket serverSocket;
+    private final ArrayList<Client> clients = new ArrayList<Client>();
+    private final Thread discoverThread;
+    private final Thread acceptThread;
+    private final Thread sendThread;
+    private final LinkedBlockingDeque<ByteBuffer> sendBuffer = new 
LinkedBlockingDeque<ByteBuffer>();
+    private volatile boolean stop;
+    
+    public TCPBroadcaster(String config) {
+        LOG.info("Init " + config);
+        MessageDigest messageDigest;
+        try {
+            String[] parts = config.split(";");
+            int startPort = 9800;
+            int endPort = 9810;
+            String key = "";
+            String[] sendTo = {"sendTo", "localhost"};
+            for (String p : parts) {
+                if (p.startsWith("ports ")) {
+                    String[] ports = p.split(" ");
+                    startPort = Integer.parseInt(ports[1]);
+                    endPort = Integer.parseInt(ports[2]);
+                } else if (p.startsWith("key ")) {
+                    key = p.split(" ")[1];
+                } else if (p.startsWith("sendTo ")) {
+                    sendTo = p.split(" ");
+                }
+            }                    
+            sendTo[0] = null;
+            messageDigest = MessageDigest.getInstance("SHA-256");
+            this.key = messageDigest.digest(key.getBytes("UTF-8"));
+            IOException lastException = null;
+            ServerSocket server = null;
+            for (int port = startPort; port <= endPort; port++) {
+                if (server == null) {
+                    try {
+                        server = new ServerSocket(port);
+                    } catch (IOException e) {
+                        LOG.debug("Cannot open port " + port);
+                        lastException = e;
+                        // ignore
+                    }
+                }
+                for (String send : sendTo) {
+                    if (send != null && !send.isEmpty()) {
+                        try {
+                            Client c = new Client(send, port);
+                            clients.add(c);
+                        } catch (IOException e) {
+                            LOG.debug("Cannot connect to " + send + " " + 
port);
+                            // ignore
+                        }                        
+                    }
+                }
+            }
+            if (server == null && lastException != null) {
+                throw lastException;
+            }
+            server.setSoTimeout(TIMEOUT);
+            serverSocket = server;
+            LOG.info("Listening on port " + server.getLocalPort());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        
+        acceptThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                accept();
+            }
+        }, "Oak TCPBroadcaster: accept #" + id);
+        acceptThread.setDaemon(true);
+        acceptThread.start();
+        
+        discoverThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                discover();
+            }
+        }, "Oak TCPBroadcaster: discover #" + id);
+        discoverThread.setDaemon(true);
+        discoverThread.start();
+        
+        sendThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                send();
+            }
+        }, "Oak TCPBroadcaster: send #" + id);
+        sendThread.setDaemon(true);
+        sendThread.start();
+        
+    }
+    
+    void accept() {
+        while (!stop) {
+            try {
+                final Socket socket = serverSocket.accept();
+                Runnable r = new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            final DataInputStream in = new 
DataInputStream(socket.getInputStream());
+                            byte[] testKey = new byte[key.length];
+                            in.readFully(testKey);
+                            if 
(ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(key)) != 0) {
+                                LOG.debug("Key mismatch");
+                                socket.close();
+                                return;
+                            }
+                            while (!socket.isClosed()) {
+                                int len = in.readInt();
+                                byte[] data = new byte[len];
+                                in.readFully(data);
+                                ByteBuffer buff = ByteBuffer.wrap(data);
+                                int start = buff.position();
+                                for (Listener l : listeners) {
+                                    buff.position(start);
+                                    l.receive(buff);
+                                }
+                            }
+                        } catch (IOException e) {
+e.printStackTrace();                            
+                            // ignore
+                        }
+                    }
+                };
+                Thread t = new Thread(r, "Oak TCPBroadcaster: listener");
+                t.setDaemon(true);
+                t.start();
+            } catch (SocketTimeoutException e) {
+                // ignore
+            } catch (IOException e) {
+                if (!stop) {
+                    LOG.warn("Receive failed", e);
+                }
+                // ignore
+            }                   
+        }
+        try {        
+            serverSocket.close();
+        } catch (IOException e) {
+            LOG.debug("Closed");
+            // ignore
+        }
+    }
+    
+    void discover() {
+        while (!stop) {
+            for (Client c : clients) {
+                c.tryConnect(key);
+                if (stop) {
+                    break;
+                }
+            }
+        }
+    }
+    
+    void send() {
+        while (!stop) {
+            try {
+                ByteBuffer buff = sendBuffer.pollLast(10, 
TimeUnit.MILLISECONDS);
+                if (buff != null && !stop) {
+                    sendBuffer(buff);
+                }
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+    
+    @Override
+    public void send(ByteBuffer buff) {
+        ByteBuffer b = ByteBuffer.allocate(buff.remaining());
+        b.put(buff);
+        b.flip();
+        while (sendBuffer.size() > MAX_BUFFER_SIZE) {
+            sendBuffer.pollLast();
+        }
+        sendBuffer.push(b);
+    }
+    
+    private void sendBuffer(ByteBuffer buff) {
+        int len = buff.limit();
+        byte[] data = new byte[len];
+        buff.get(data);
+        for (Client c : clients) {
+            c.send(data);
+            if (stop) {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void addListener(Listener listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(Listener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void close() {
+        if (!stop) {
+            LOG.debug("Stopping");
+            this.stop = true;
+            try {
+                serverSocket.close();
+            } catch (IOException e) {
+                // ignore
+            }
+            try {
+                acceptThread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            try {
+                sendThread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            try {
+                discoverThread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    public boolean isRunning() {
+        return !stop;
+    }
+    
+    static class Client {
+        final InetAddress address;
+        final int port;
+        DataOutputStream out;
+        Client(String name, int port) throws UnknownHostException {
+            this.address = InetAddress.getByName(name);
+            this.port = port;
+        }
+        void send(byte[] data) {
+            DataOutputStream o = out;
+            if (o != null) {
+                synchronized (o) {
+                    try {
+                        o.writeInt(data.length);
+                        o.write(data);
+                        o.flush();
+                    } catch (IOException e) {
+                        LOG.debug("Writing failed, port " + port, e);
+                        try {
+                            o.close();
+                        } catch (IOException e1) {
+                            // ignore
+                        }
+                        out = null;
+                    }
+                }
+            }
+        }
+        void tryConnect(byte[] key) {
+            DataOutputStream o = out;
+            if (o != null || address == null) {
+                return;
+            }
+            Socket socket = new Socket();
+            try {
+                socket.connect(new InetSocketAddress(address, port), TIMEOUT);
+                o = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
+                o.write(key);
+                o.flush();
+                out = o;
+                LOG.info("Connected to " + address + " port " + port + " k " + 
key[0]);
+            } catch (IOException e) {
+                // ok, done
+            }
+        }
+    }
+
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
 Thu Nov 19 13:35:59 2015
@@ -114,6 +114,7 @@ public class UDPBroadcaster implements B
             if (!stop) {
                 LOG.warn("join group failed", e);
             }
+            stop = true;
             return;
         }              
         while (!stop) {
@@ -246,4 +247,8 @@ public class UDPBroadcaster implements B
         }
     }
 
+    public boolean isRunning() {
+        return !stop;
+    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
 Thu Nov 19 13:35:59 2015
@@ -21,19 +21,128 @@ package org.apache.jackrabbit.oak.plugin
 import static org.junit.Assert.assertNull;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Random;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.plugins.document.PathRev;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.PatternLayout;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
 
 import com.google.common.cache.Cache;
 
 public class BroadcastTest {
+    
+    public static void main(String... args) throws Exception {
+        listen();
+        benchmark();
+    }
+    
+    private static void benchmark() throws IOException {
+        FileUtils.deleteDirectory(new File("target/broadcastTest"));
+        new File("target/broadcastTest").mkdirs();     
+        String type = "tcp:key 1;ports 9700 9800";
+        ArrayList<PersistentCache> nodeList = new ArrayList<PersistentCache>();
+        for (int nodes = 1; nodes < 20; nodes++) {
+            PersistentCache pc = new PersistentCache("target/broadcastTest/p" 
+ nodes + ",broadcast=" + type);
+            Cache<PathRev, StringValue> cache = openCache(pc);
+            String key = "/test" + Math.random();
+            PathRev k = new PathRev(key, new Revision(0, 0, 0));
+            long time = System.currentTimeMillis();
+            for (int i = 0; i < 2000; i++) {
+                cache.put(k, new StringValue("Hello World " + i));
+                cache.invalidate(k);
+                cache.getIfPresent(k);
+            }
+            time = System.currentTimeMillis() - time;
+            System.out.println("nodes: " + nodes + " time: " + time);
+            nodeList.add(pc);
+        }
+        for (PersistentCache c : nodeList) {
+            c.close();
+        }
+    }
+    
+    private static void listen() throws InterruptedException {
+        String config = "key 123";
+
+        
+        ConsoleAppender<ILoggingEvent> ca = new 
ConsoleAppender<ILoggingEvent>();
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        ca.setContext(lc);
+        PatternLayout pl = new PatternLayout();
+        pl.setPattern("%msg%n");
+        pl.setContext(lc);
+        pl.start();
+        ca.setLayout(pl);
+        ca.start();
+        
+        ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) 
LoggerFactory.getLogger(TCPBroadcaster.class);
+        logger.addAppender(ca);
+        logger.setLevel(Level.DEBUG);
+        
+        TCPBroadcaster receiver = new TCPBroadcaster(config);
+        receiver.addListener(new Broadcaster.Listener() {
+
+            @Override
+            public void receive(ByteBuffer buff) {
+                int end = buff.position();
+                StringBuilder sb = new StringBuilder();
+                while (buff.remaining() > 0) {
+                    char c = (char) buff.get();
+                    if (c >= ' ' && c < 128) {
+                        sb.append(c);
+                    } else if (c <= 9) {
+                        sb.append((char) ('0' + c));
+                    } else {
+                        sb.append('.');
+                    }
+                }
+                String dateTime = new 
Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
+                System.out.println(dateTime + " Received " + sb);
+                buff.position(end);
+            }
+            
+        });
+        Random r = new Random();
+        int x = r.nextInt();
+        System.out.println("Sending " + x);
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(10);
+            ByteBuffer buff = ByteBuffer.allocate(1024);
+            buff.putInt(0);
+            buff.putInt(x);
+            buff.put(new byte[100]);
+            buff.flip();
+            receiver.send(buff);
+            if (!receiver.isRunning()) {
+                System.out.println("Did not start or already stopped");
+                break;
+            }
+        }
+        Thread.sleep(Integer.MAX_VALUE);
+    }
+    
+    @Test
+    public void broadcastTCP() throws Exception {
+        broadcast("tcp:key 123", 90);
+    }
 
     @Test
     public void broadcastInMemory() throws Exception {
@@ -134,7 +243,6 @@ public class BroadcastTest {
             @Override
             public Boolean call() {
                 V v = map.getIfPresent(key);
-//System.out.println("key " + key + " result " + v + " map " + map);           
     
                 if (value == null) {
                     return  v == null;
                 } 


Reply via email to