Author: thomasm
Date: Thu Nov 19 13:35:59 2015
New Revision: 1715177
URL: http://svn.apache.org/viewvc?rev=1715177&view=rev
Log:
OAK-2843 Broadcasting cache
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
Thu Nov 19 13:35:59 2015
@@ -44,7 +44,12 @@ public class MultiGenerationMap<K, V> im
@Override
public V put(K key, V value) {
- return write.put(key, value);
+ CacheMap<K, V> m = write;
+ if (m == null) {
+ // closed concurrently
+ return null;
+ }
+ return m.put(key, value);
}
@SuppressWarnings("unchecked")
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
Thu Nov 19 13:35:59 2015
@@ -113,10 +113,13 @@ class NodeCache<K, V> implements Cache<K
}
});
}
- if (value == null) {
- map.remove(key);
- } else {
- map.put(key, value);
+ MultiGenerationMap<K, V> m = map;
+ if (m != null) {
+ if (value == null) {
+ m.remove(key);
+ } else {
+ m.put(key, value);
+ }
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
Thu Nov 19 13:35:59 2015
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.h2.mvstore.FileStore;
@@ -197,6 +198,9 @@ public class PersistentCache implements
} else if (broadcast.startsWith("udp:")) {
String config = broadcast.substring("udp:".length(),
broadcast.length());
broadcaster = new UDPBroadcaster(config);
+ } else if (broadcast.startsWith("tcp:")) {
+ String config = broadcast.substring("tcp:".length(),
broadcast.length());
+ broadcaster = new TCPBroadcaster(config);
} else {
throw new IllegalArgumentException("Unknown broadcaster type " +
broadcast);
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java?rev=1715177&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
Thu Nov 19 13:35:59 2015
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A broadcast mechanism that uses TCP. It is mainly used for testing.
+ */
+public class TCPBroadcaster implements Broadcaster {
+
+ static final Logger LOG = LoggerFactory.getLogger(TCPBroadcaster.class);
+ private static final int TIMEOUT = 100;
+ private static final int MAX_BUFFER_SIZE = 64;
+ private static final AtomicInteger NEXT_ID = new AtomicInteger();
+
+ private final byte[] key;
+ private final int id = NEXT_ID.incrementAndGet();
+ private final CopyOnWriteArrayList<Listener> listeners = new
CopyOnWriteArrayList<Listener>();
+ private final ServerSocket serverSocket;
+ private final ArrayList<Client> clients = new ArrayList<Client>();
+ private final Thread discoverThread;
+ private final Thread acceptThread;
+ private final Thread sendThread;
+ private final LinkedBlockingDeque<ByteBuffer> sendBuffer = new
LinkedBlockingDeque<ByteBuffer>();
+ private volatile boolean stop;
+
+ public TCPBroadcaster(String config) {
+ LOG.info("Init " + config);
+ MessageDigest messageDigest;
+ try {
+ String[] parts = config.split(";");
+ int startPort = 9800;
+ int endPort = 9810;
+ String key = "";
+ String[] sendTo = {"sendTo", "localhost"};
+ for (String p : parts) {
+ if (p.startsWith("ports ")) {
+ String[] ports = p.split(" ");
+ startPort = Integer.parseInt(ports[1]);
+ endPort = Integer.parseInt(ports[2]);
+ } else if (p.startsWith("key ")) {
+ key = p.split(" ")[1];
+ } else if (p.startsWith("sendTo ")) {
+ sendTo = p.split(" ");
+ }
+ }
+ sendTo[0] = null;
+ messageDigest = MessageDigest.getInstance("SHA-256");
+ this.key = messageDigest.digest(key.getBytes("UTF-8"));
+ IOException lastException = null;
+ ServerSocket server = null;
+ for (int port = startPort; port <= endPort; port++) {
+ if (server == null) {
+ try {
+ server = new ServerSocket(port);
+ } catch (IOException e) {
+ LOG.debug("Cannot open port " + port);
+ lastException = e;
+ // ignore
+ }
+ }
+ for (String send : sendTo) {
+ if (send != null && !send.isEmpty()) {
+ try {
+ Client c = new Client(send, port);
+ clients.add(c);
+ } catch (IOException e) {
+ LOG.debug("Cannot connect to " + send + " " +
port);
+ // ignore
+ }
+ }
+ }
+ }
+ if (server == null && lastException != null) {
+ throw lastException;
+ }
+ server.setSoTimeout(TIMEOUT);
+ serverSocket = server;
+ LOG.info("Listening on port " + server.getLocalPort());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ acceptThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ accept();
+ }
+ }, "Oak TCPBroadcaster: accept #" + id);
+ acceptThread.setDaemon(true);
+ acceptThread.start();
+
+ discoverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ discover();
+ }
+ }, "Oak TCPBroadcaster: discover #" + id);
+ discoverThread.setDaemon(true);
+ discoverThread.start();
+
+ sendThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ send();
+ }
+ }, "Oak TCPBroadcaster: send #" + id);
+ sendThread.setDaemon(true);
+ sendThread.start();
+
+ }
+
+ void accept() {
+ while (!stop) {
+ try {
+ final Socket socket = serverSocket.accept();
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final DataInputStream in = new
DataInputStream(socket.getInputStream());
+ byte[] testKey = new byte[key.length];
+ in.readFully(testKey);
+ if
(ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(key)) != 0) {
+ LOG.debug("Key mismatch");
+ socket.close();
+ return;
+ }
+ while (!socket.isClosed()) {
+ int len = in.readInt();
+ byte[] data = new byte[len];
+ in.readFully(data);
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int start = buff.position();
+ for (Listener l : listeners) {
+ buff.position(start);
+ l.receive(buff);
+ }
+ }
+ } catch (IOException e) {
+e.printStackTrace();
+ // ignore
+ }
+ }
+ };
+ Thread t = new Thread(r, "Oak TCPBroadcaster: listener");
+ t.setDaemon(true);
+ t.start();
+ } catch (SocketTimeoutException e) {
+ // ignore
+ } catch (IOException e) {
+ if (!stop) {
+ LOG.warn("Receive failed", e);
+ }
+ // ignore
+ }
+ }
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ LOG.debug("Closed");
+ // ignore
+ }
+ }
+
+ void discover() {
+ while (!stop) {
+ for (Client c : clients) {
+ c.tryConnect(key);
+ if (stop) {
+ break;
+ }
+ }
+ }
+ }
+
+ void send() {
+ while (!stop) {
+ try {
+ ByteBuffer buff = sendBuffer.pollLast(10,
TimeUnit.MILLISECONDS);
+ if (buff != null && !stop) {
+ sendBuffer(buff);
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer buff) {
+ ByteBuffer b = ByteBuffer.allocate(buff.remaining());
+ b.put(buff);
+ b.flip();
+ while (sendBuffer.size() > MAX_BUFFER_SIZE) {
+ sendBuffer.pollLast();
+ }
+ sendBuffer.push(b);
+ }
+
+ private void sendBuffer(ByteBuffer buff) {
+ int len = buff.limit();
+ byte[] data = new byte[len];
+ buff.get(data);
+ for (Client c : clients) {
+ c.send(data);
+ if (stop) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void addListener(Listener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(Listener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void close() {
+ if (!stop) {
+ LOG.debug("Stopping");
+ this.stop = true;
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ try {
+ acceptThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ try {
+ sendThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ try {
+ discoverThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public boolean isRunning() {
+ return !stop;
+ }
+
+ static class Client {
+ final InetAddress address;
+ final int port;
+ DataOutputStream out;
+ Client(String name, int port) throws UnknownHostException {
+ this.address = InetAddress.getByName(name);
+ this.port = port;
+ }
+ void send(byte[] data) {
+ DataOutputStream o = out;
+ if (o != null) {
+ synchronized (o) {
+ try {
+ o.writeInt(data.length);
+ o.write(data);
+ o.flush();
+ } catch (IOException e) {
+ LOG.debug("Writing failed, port " + port, e);
+ try {
+ o.close();
+ } catch (IOException e1) {
+ // ignore
+ }
+ out = null;
+ }
+ }
+ }
+ }
+ void tryConnect(byte[] key) {
+ DataOutputStream o = out;
+ if (o != null || address == null) {
+ return;
+ }
+ Socket socket = new Socket();
+ try {
+ socket.connect(new InetSocketAddress(address, port), TIMEOUT);
+ o = new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
+ o.write(key);
+ o.flush();
+ out = o;
+ LOG.info("Connected to " + address + " port " + port + " k " +
key[0]);
+ } catch (IOException e) {
+ // ok, done
+ }
+ }
+ }
+
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
Thu Nov 19 13:35:59 2015
@@ -114,6 +114,7 @@ public class UDPBroadcaster implements B
if (!stop) {
LOG.warn("join group failed", e);
}
+ stop = true;
return;
}
while (!stop) {
@@ -246,4 +247,8 @@ public class UDPBroadcaster implements B
}
}
+ public boolean isRunning() {
+ return !stop;
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java?rev=1715177&r1=1715176&r2=1715177&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
Thu Nov 19 13:35:59 2015
@@ -21,19 +21,128 @@ package org.apache.jackrabbit.oak.plugin
import static org.junit.Assert.assertNull;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.plugins.document.PathRev;
import org.apache.jackrabbit.oak.plugins.document.Revision;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.PatternLayout;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
import com.google.common.cache.Cache;
public class BroadcastTest {
+
+ public static void main(String... args) throws Exception {
+ listen();
+ benchmark();
+ }
+
+ private static void benchmark() throws IOException {
+ FileUtils.deleteDirectory(new File("target/broadcastTest"));
+ new File("target/broadcastTest").mkdirs();
+ String type = "tcp:key 1;ports 9700 9800";
+ ArrayList<PersistentCache> nodeList = new ArrayList<PersistentCache>();
+ for (int nodes = 1; nodes < 20; nodes++) {
+ PersistentCache pc = new PersistentCache("target/broadcastTest/p"
+ nodes + ",broadcast=" + type);
+ Cache<PathRev, StringValue> cache = openCache(pc);
+ String key = "/test" + Math.random();
+ PathRev k = new PathRev(key, new Revision(0, 0, 0));
+ long time = System.currentTimeMillis();
+ for (int i = 0; i < 2000; i++) {
+ cache.put(k, new StringValue("Hello World " + i));
+ cache.invalidate(k);
+ cache.getIfPresent(k);
+ }
+ time = System.currentTimeMillis() - time;
+ System.out.println("nodes: " + nodes + " time: " + time);
+ nodeList.add(pc);
+ }
+ for (PersistentCache c : nodeList) {
+ c.close();
+ }
+ }
+
+ private static void listen() throws InterruptedException {
+ String config = "key 123";
+
+
+ ConsoleAppender<ILoggingEvent> ca = new
ConsoleAppender<ILoggingEvent>();
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ ca.setContext(lc);
+ PatternLayout pl = new PatternLayout();
+ pl.setPattern("%msg%n");
+ pl.setContext(lc);
+ pl.start();
+ ca.setLayout(pl);
+ ca.start();
+
+ ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(TCPBroadcaster.class);
+ logger.addAppender(ca);
+ logger.setLevel(Level.DEBUG);
+
+ TCPBroadcaster receiver = new TCPBroadcaster(config);
+ receiver.addListener(new Broadcaster.Listener() {
+
+ @Override
+ public void receive(ByteBuffer buff) {
+ int end = buff.position();
+ StringBuilder sb = new StringBuilder();
+ while (buff.remaining() > 0) {
+ char c = (char) buff.get();
+ if (c >= ' ' && c < 128) {
+ sb.append(c);
+ } else if (c <= 9) {
+ sb.append((char) ('0' + c));
+ } else {
+ sb.append('.');
+ }
+ }
+ String dateTime = new
Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
+ System.out.println(dateTime + " Received " + sb);
+ buff.position(end);
+ }
+
+ });
+ Random r = new Random();
+ int x = r.nextInt();
+ System.out.println("Sending " + x);
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(10);
+ ByteBuffer buff = ByteBuffer.allocate(1024);
+ buff.putInt(0);
+ buff.putInt(x);
+ buff.put(new byte[100]);
+ buff.flip();
+ receiver.send(buff);
+ if (!receiver.isRunning()) {
+ System.out.println("Did not start or already stopped");
+ break;
+ }
+ }
+ Thread.sleep(Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void broadcastTCP() throws Exception {
+ broadcast("tcp:key 123", 90);
+ }
@Test
public void broadcastInMemory() throws Exception {
@@ -134,7 +243,6 @@ public class BroadcastTest {
@Override
public Boolean call() {
V v = map.getIfPresent(key);
-//System.out.println("key " + key + " result " + v + " map " + map);
if (value == null) {
return v == null;
}