This is an automated email from the ASF dual-hosted git repository.
reschke pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/1.22 by this push:
new e21c928927 OAK-11419: oak-store-document - persistent cache: remove
unused UDP/TCP broadcasters
e21c928927 is described below
commit e21c9289279bd02f642c3fcd77d911ff6ed5b3ce
Author: Julian Reschke <[email protected]>
AuthorDate: Thu Mar 20 13:27:47 2025 +0100
OAK-11419: oak-store-document - persistent cache: remove unused UDP/TCP
broadcasters
---
.../document/persistentCache/PersistentCache.java | 8 -
.../persistentCache/broadcast/TCPBroadcaster.java | 453 ---------------------
.../persistentCache/broadcast/UDPBroadcaster.java | 260 ------------
.../document/persistentCache/BroadcastTest.java | 125 +-----
4 files changed, 7 insertions(+), 839 deletions(-)
diff --git
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
index a6227debb1..95620c183c 100644
---
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
+++
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
@@ -33,8 +33,6 @@ import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Dyna
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.h2.mvstore.FileStore;
import org.h2.mvstore.MVMap;
@@ -229,12 +227,6 @@ public class PersistentCache implements
Broadcaster.Listener {
return;
} else if (broadcast.equals("inMemory")) {
broadcaster = InMemoryBroadcaster.INSTANCE;
- } else if (broadcast.startsWith("udp:")) {
- String config = broadcast.substring("udp:".length(),
broadcast.length());
- broadcaster = new UDPBroadcaster(config);
- } else if (broadcast.startsWith("tcp:")) {
- String config = broadcast.substring("tcp:".length(),
broadcast.length());
- broadcaster = new TCPBroadcaster(config);
} else {
throw new IllegalArgumentException("Unknown broadcaster type " +
broadcast);
}
diff --git
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
deleted file mode 100644
index 283f0e7817..0000000000
---
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.security.MessageDigest;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A broadcast mechanism that uses TCP. It is mainly used for testing.
- */
-public class TCPBroadcaster implements Broadcaster {
-
- static final Logger LOG = LoggerFactory.getLogger(TCPBroadcaster.class);
- private static final int TIMEOUT = 100;
- private static final int MAX_BUFFER_SIZE = 64;
- private static final AtomicInteger NEXT_ID = new AtomicInteger();
- private static final Charset UTF8 = Charset.forName("UTF-8");
-
- private final int id = NEXT_ID.incrementAndGet();
-
- private final CopyOnWriteArrayList<Listener> listeners = new
CopyOnWriteArrayList<Listener>();
- private final ConcurrentHashMap<String, Client> clients = new
ConcurrentHashMap<String, Client>();
- private final ArrayBlockingQueue<ByteBuffer> sendBuffer = new
ArrayBlockingQueue<ByteBuffer>(MAX_BUFFER_SIZE * 2);
-
- private volatile DynamicBroadcastConfig broadcastConfig;
- private ServerSocket serverSocket;
- private Thread acceptThread;
- private Thread discoverThread;
- private Thread sendThread;
- private String ownListener;
- private String ownKeyUUID = UUID.randomUUID().toString();
- private byte[] ownKey = ownKeyUUID.getBytes(UTF8);
-
- private final AtomicBoolean stop = new AtomicBoolean(false);
-
- public TCPBroadcaster(String config) {
- LOG.info("Init " + config);
- init(config);
- }
-
- public void init(String config) {
- try {
- String[] parts = config.split(";");
- int startPort = 9800;
- int endPort = 9810;
- String key = "";
-
- // for debugging, this will send everything to localhost:
- // String[] sendTo = {"sendTo", "localhost"};
-
- // by default, only the entries in the clusterNodes
- // collection are used:
- String[] sendTo = {"sendTo"};
-
- for (String p : parts) {
- if (p.startsWith("ports ")) {
- String[] ports = p.split(" ");
- startPort = Integer.parseInt(ports[1]);
- endPort = Integer.parseInt(ports[2]);
- } else if (p.startsWith("key ")) {
- key = p.split(" ")[1];
- } else if (p.startsWith("sendTo ")) {
- sendTo = p.split(" ");
- }
- }
- sendTo[0] = null;
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
- if (key.length() > 0) {
- ownKey = messageDigest.digest(key.getBytes(UTF8));
- }
- IOException lastException = null;
- ServerSocket server = null;
- for (int port = startPort; port <= endPort; port++) {
- if (server == null) {
- try {
- server = new ServerSocket(port);
- } catch (IOException e) {
- LOG.debug("Cannot open port " + port);
- lastException = e;
- // ignore
- }
- }
- for (String send : sendTo) {
- if (send != null && !send.isEmpty()) {
- try {
- Client c = new Client(send, port, ownKey);
- clients.put(send + ":" + port, c);
- } catch (IOException e) {
- LOG.debug("Cannot connect to " + send + " " +
port);
- // ignore
- }
- }
- }
- }
- if (server == null && lastException != null) {
- throw lastException;
- }
- server.setSoTimeout(TIMEOUT);
- serverSocket = server;
- LOG.info("Listening on port " + server.getLocalPort());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- acceptThread = new Thread(new Runnable() {
- @Override
- public void run() {
- accept();
- }
- }, "Oak TCPBroadcaster: accept #" + id);
- acceptThread.setDaemon(true);
- acceptThread.start();
-
- discoverThread = new Thread(new Runnable() {
- @Override
- public void run() {
- discover();
- }
- }, "Oak TCPBroadcaster: discover #" + id);
- discoverThread.setDaemon(true);
- discoverThread.start();
-
- sendThread = new Thread(new Runnable() {
- @Override
- public void run() {
- send();
- }
- }, "Oak TCPBroadcaster: send #" + id);
- sendThread.setDaemon(true);
- sendThread.start();
-
- }
-
- @Override
- public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
- this.broadcastConfig = broadcastConfig;
- HashMap<String, String> clientInfo = new HashMap<String, String>();
- clientInfo.put(DynamicBroadcastConfig.ID, ownKeyUUID);
- ServerSocket s = serverSocket;
- if (s != null) {
- String address = getLocalAddress();
- if (address != null) {
- ownListener = address + ":" + s.getLocalPort();
- clientInfo.put(DynamicBroadcastConfig.LISTENER, ownListener);
- }
- }
- broadcastConfig.connect(clientInfo);
- }
-
- static String getLocalAddress() {
- String bind = System.getProperty("oak.tcpBindAddress", null);
- try {
- InetAddress address;
- if (bind != null && !bind.isEmpty()) {
- address = InetAddress.getByName(bind);
- } else {
- address = InetAddress.getLocalHost();
- }
- return address.getHostAddress();
- } catch (UnknownHostException e) {
- return "";
- }
- }
-
- void accept() {
- while (isRunning()) {
- try {
- final Socket socket = serverSocket.accept();
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- final DataInputStream in = new
DataInputStream(socket.getInputStream());
- byte[] testKey = new byte[ownKey.length];
- in.readFully(testKey);
- if
(ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(ownKey)) != 0) {
- LOG.debug("Key mismatch");
- socket.close();
- return;
- }
- while (!socket.isClosed()) {
- int len = in.readInt();
- byte[] data = new byte[len];
- in.readFully(data);
- ByteBuffer buff = ByteBuffer.wrap(data);
- int start = buff.position();
- for (Listener l : listeners) {
- ((Buffer)buff).position(start);
- l.receive(buff);
- }
- }
- } catch (IOException e) {
- // ignore
- }
- }
- };
- Thread t = new Thread(r, "Oak TCPBroadcaster: listener");
- t.setDaemon(true);
- t.start();
- } catch (SocketTimeoutException e) {
- // ignore
- } catch (IOException e) {
- if (isRunning()) {
- LOG.warn("Receive failed", e);
- }
- // ignore
- }
- }
- try {
- serverSocket.close();
- } catch (IOException e) {
- LOG.debug("Closed");
- // ignore
- }
- }
-
- void discover() {
- while (isRunning()) {
- DynamicBroadcastConfig b = broadcastConfig;
- if (b != null) {
- readClients(b);
- }
- for (Client c : clients.values()) {
- c.tryConnect();
- if (!isRunning()) {
- break;
- }
- }
- synchronized (stop) {
- if (isRunning()) {
- try {
- stop.wait(2000);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- }
- }
-
- void readClients(DynamicBroadcastConfig b) {
- List<Map<String, String>> list = b.getClientInfo();
- for(Map<String, String> m : list) {
- String listener = m.get(DynamicBroadcastConfig.LISTENER);
- String id = m.get(DynamicBroadcastConfig.ID);
- if (listener.equals(ownListener)) {
- continue;
- }
- // the key is the combination of listener and id,
- // because the same ip address / port combination
- // could be there multiple times for some time
- // (in case there is a old, orphan entry for the same machine)
- String clientKey = listener + " " + id;
- Client c = clients.get(clientKey);
- if (c == null) {
- int index = listener.lastIndexOf(':');
- if (index >= 0) {
- String host = listener.substring(0, index);
- int port = Integer.parseInt(listener.substring(index + 1));
- try {
- byte[] key = id.getBytes(UTF8);
- c = new Client(host, port, key);
- clients.put(clientKey, c);
- } catch (UnknownHostException e) {
- // ignore
- }
- }
- }
- }
- }
-
- void send() {
- while (isRunning()) {
- try {
- ByteBuffer buff = sendBuffer.poll(10, TimeUnit.MILLISECONDS);
- if (buff != null && isRunning()) {
- sendBuffer(buff);
- }
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- @Override
- public void send(ByteBuffer buff) {
- ByteBuffer b = ByteBuffer.allocate(buff.remaining());
- b.put(buff);
- ((Buffer)b).flip();
- while (sendBuffer.size() > MAX_BUFFER_SIZE) {
- sendBuffer.poll();
- }
- try {
- sendBuffer.add(b);
- } catch (IllegalStateException e) {
- // ignore - might happen once in a while,
- // if the buffer was not yet full just before, but now
- // many threads concurrently tried to add
- }
- }
-
- private void sendBuffer(ByteBuffer buff) {
- int len = buff.limit();
- byte[] data = new byte[len];
- buff.get(data);
- for (Client c : clients.values()) {
- c.send(data);
- if (!isRunning()) {
- break;
- }
- }
- }
-
- @Override
- public void addListener(Listener listener) {
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(Listener listener) {
- listeners.remove(listener);
- }
-
- @Override
- public void close() {
- if (isRunning()) {
- LOG.debug("Stopping");
- synchronized (stop) {
- stop.set(true);
- stop.notifyAll();
- }
- try {
- serverSocket.close();
- } catch (IOException e) {
- // ignore
- }
- try {
- acceptThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- try {
- sendThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- try {
- discoverThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- public final boolean isRunning() {
- return !stop.get();
- }
-
- static class Client {
- final String host;
- final int port;
- final byte[] key;
- DataOutputStream out;
- Client(String host, int port, byte[] key) throws UnknownHostException {
- this.host = host;
- this.port = port;
- this.key = key;
- }
- void send(byte[] data) {
- DataOutputStream o = out;
- if (o != null) {
- synchronized (o) {
- try {
- o.writeInt(data.length);
- o.write(data);
- o.flush();
- } catch (IOException e) {
- LOG.debug("Writing failed, port " + port, e);
- try {
- o.close();
- } catch (IOException e1) {
- // ignore
- }
- out = null;
- }
- }
- }
- }
- void tryConnect() {
- DataOutputStream o = out;
- if (o != null || host == null) {
- return;
- }
- InetAddress address;
- try {
- address = InetAddress.getByName(host);
- } catch (UnknownHostException e1) {
- return;
- }
- Socket socket = new Socket();
- try {
- socket.connect(new InetSocketAddress(address, port), TIMEOUT);
- o = new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
- o.write(key);
- o.flush();
- out = o;
- LOG.info("Connected to " + address + " port " + port + " k " +
key[0]);
- } catch (IOException e) {
- // ok, done
- }
- }
- }
-
-}
diff --git
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
deleted file mode 100644
index 1ef843df18..0000000000
---
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-
-import javax.crypto.Cipher;
-import javax.crypto.CipherInputStream;
-import javax.crypto.CipherOutputStream;
-import javax.crypto.KeyGenerator;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.IvParameterSpec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A broadcast mechanism that uses UDP. It is mainly used for testing.
- */
-public class UDPBroadcaster implements Broadcaster, Runnable {
-
- static final Logger LOG = LoggerFactory.getLogger(UDPBroadcaster.class);
-
- private final byte[] key;
- private final ArrayList<Listener> listeners = new ArrayList<Listener>();
- private final MulticastSocket socket;
- private final int port;
- private final InetAddress group;
- private final InetAddress sendTo;
- private final Thread thread;
- private final int messageLength = 32 * 1024;
- private final Cipher encryptCipher;
- private final Cipher decryptCipher;
- private volatile boolean stop;
-
- public UDPBroadcaster(String config) {
- LOG.info("init " + config);
- MessageDigest messageDigest;
- try {
- String[] parts = config.split(";");
- String group = "FF78:230::1234";
- int port = 9876;
- String key = "";
- boolean aes = false;
- String sendTo = null;
- for (String p : parts) {
- if (p.startsWith("port ")) {
- port = Integer.parseInt(p.split(" ")[1]);
- } else if (p.startsWith("group ")) {
- group = p.split(" ")[1];
- } else if (p.startsWith("key ")) {
- key = p.split(" ")[1];
- } else if (p.equals("aes")) {
- aes = true;
- } else if (p.startsWith("sendTo ")) {
- sendTo = p.split(" ")[1];
- }
- }
- messageDigest = MessageDigest.getInstance("SHA-256");
- this.key = messageDigest.digest(key.getBytes());
- if (aes) {
- KeyGenerator kgen = KeyGenerator.getInstance("AES");
- kgen.init(128);
- SecretKey aesKey = kgen.generateKey();
- encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- encryptCipher.init(Cipher.ENCRYPT_MODE, aesKey);
- decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- IvParameterSpec ivParameterSpec = new
IvParameterSpec(aesKey.getEncoded());
- decryptCipher.init(Cipher.DECRYPT_MODE, aesKey,
ivParameterSpec);
- } else {
- encryptCipher = null;
- decryptCipher = null;
- }
- socket = new MulticastSocket(port);
- this.group = InetAddress.getByName(group);
- this.sendTo = sendTo == null ? this.group :
InetAddress.getByName(sendTo);
- this.port = port;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- thread = new Thread(this, "Oak UDPBroadcaster listener");
- thread.setDaemon(true);
- thread.start();
- }
-
- @Override
- public void run() {
- byte[] receiveData = new byte[messageLength];
- try {
- socket.joinGroup(group);
- } catch (IOException e) {
- if (!stop) {
- LOG.warn("join group failed", e);
- }
- stop = true;
- return;
- }
- while (!stop) {
- try {
- DatagramPacket receivePacket = new DatagramPacket(
- receiveData, receiveData.length);
- socket.receive(receivePacket);
- int len = receivePacket.getLength();
- if (len < key.length) {
- LOG.debug("too short");
- continue;
- }
- if (!checkKey(receiveData)) {
- LOG.debug("key mismatch");
- continue;
- }
- ByteBuffer buff = ByteBuffer.wrap(receiveData);
- ((Buffer)buff).limit(len);
- int start = key.length;
- for (Listener l : listeners) {
- ((Buffer)buff).position(start);
- l.receive(buff);
- }
- } catch (IOException e) {
- if (!stop) {
- LOG.warn("receive failed", e);
- }
- // ignore
- }
- }
- try {
- socket.leaveGroup(group);
- socket.close();
- } catch (IOException e) {
- if (!stop) {
- LOG.warn("leave group failed", e);
- }
- }
- }
-
- private boolean checkKey(byte[] data) {
- for (int i = 0; i < key.length; i++) {
- if (key[i] != data[i]) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void send(ByteBuffer buff) {
- int len = key.length + buff.limit();
- if (len >= messageLength) {
- // message too long: ignore
- return;
- }
- try {
- byte[] sendData = new byte[len];
- System.arraycopy(key, 0, sendData, 0, key.length);
- buff.get(sendData, key.length, buff.limit());
- DatagramPacket sendPacket = new DatagramPacket(sendData,
- sendData.length, sendTo, port);
- socket.send(sendPacket);
- } catch (IOException e) {
- if (!stop) {
- LOG.debug("send failed", e);
- }
- // ignore
- }
- }
-
- @Override
- public void addListener(Listener listener) {
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(Listener listener) {
- listeners.remove(listener);
- }
-
- byte[] encrypt(byte[] data) {
- if (encryptCipher == null) {
- return data;
- }
- try {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- CipherOutputStream cipherOutputStream = new
CipherOutputStream(outputStream, encryptCipher);
- cipherOutputStream.write(data);
- cipherOutputStream.flush();
- cipherOutputStream.close();
- byte[] encryptedBytes = outputStream.toByteArray();
- return encryptedBytes;
- } catch (IOException e) {
- LOG.debug("encrypt failed", e);
- throw new RuntimeException(e);
- }
- }
-
- byte[] decrypt(byte[] data) {
- if (decryptCipher == null) {
- return data;
- }
- try {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ByteArrayInputStream inStream = new ByteArrayInputStream(data);
- CipherInputStream cipherInputStream = new
CipherInputStream(inStream, decryptCipher);
- byte[] buf = new byte[1024];
- int bytesRead;
- while ((bytesRead = cipherInputStream.read(buf)) >= 0) {
- outputStream.write(buf, 0, bytesRead);
- }
- return outputStream.toByteArray();
- } catch (IOException e) {
- LOG.debug("decrypt failed", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- if (!stop) {
- this.stop = true;
- socket.close();
- try {
- thread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- public boolean isRunning() {
- return !stop;
- }
-
- @Override
- public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
- // not yet implemented
- }
-
-}
diff --git
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
index 48df5e991c..15227d0cd7 100644
---
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
+++
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
@@ -22,10 +22,7 @@ import static org.junit.Assert.assertNull;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
@@ -33,29 +30,15 @@ import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.plugins.document.MemoryDiffCache.Key;
import org.apache.jackrabbit.oak.plugins.document.Path;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.PatternLayout;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.ConsoleAppender;
import com.google.common.cache.Cache;
public class BroadcastTest {
-
- public static void main(String... args) throws Exception {
- listen();
- benchmark();
- }
-
+
private static void benchmark() throws IOException {
FileUtils.deleteDirectory(new File("target/broadcastTest"));
new File("target/broadcastTest").mkdirs();
@@ -82,106 +65,12 @@ public class BroadcastTest {
c.close();
}
}
-
- private static void listen() throws InterruptedException {
- String config = "key 123";
-
-
- ConsoleAppender<ILoggingEvent> ca = new
ConsoleAppender<ILoggingEvent>();
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
- ca.setContext(lc);
- PatternLayout pl = new PatternLayout();
- pl.setPattern("%msg%n");
- pl.setContext(lc);
- pl.start();
- ca.setLayout(pl);
- ca.start();
-
- ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(TCPBroadcaster.class);
- logger.addAppender(ca);
- logger.setLevel(Level.DEBUG);
-
- TCPBroadcaster receiver = new TCPBroadcaster(config);
- receiver.addListener(new Broadcaster.Listener() {
-
- @Override
- public void receive(ByteBuffer buff) {
- int end = buff.position();
- StringBuilder sb = new StringBuilder();
- while (buff.remaining() > 0) {
- char c = (char) buff.get();
- if (c >= ' ' && c < 128) {
- sb.append(c);
- } else if (c <= 9) {
- sb.append((char) ('0' + c));
- } else {
- sb.append('.');
- }
- }
- String dateTime = new
Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
- System.out.println(dateTime + " Received " + sb);
- buff.position(end);
- }
-
- });
- Random r = new Random();
- int x = r.nextInt();
- System.out.println("Sending " + x);
- for (int i = 0; i < 10; i++) {
- Thread.sleep(10);
- ByteBuffer buff = ByteBuffer.allocate(1024);
- buff.putInt(0);
- buff.putInt(x);
- buff.put(new byte[100]);
- buff.flip();
- receiver.send(buff);
- if (!receiver.isRunning()) {
- System.out.println("Did not start or already stopped");
- break;
- }
- }
- Thread.sleep(Integer.MAX_VALUE);
- }
-
- @Test
- public void broadcastTCP() throws Exception {
- broadcast("tcp:sendTo localhost;key 123", 80);
- }
@Test
public void broadcastInMemory() throws Exception {
broadcast("inMemory", 100);
}
-
- @Test
- @Ignore("OAK-2843")
- public void broadcastUDP() throws Exception {
- try {
- broadcast("udp:sendTo localhost", 50);
- } catch (AssertionError e) {
- // IPv6 didn't work, so try with IPv4
- try {
- broadcast("udp:group 228.6.7.9", 50);
- } catch (AssertionError e2) {
- throwBoth(e, e2);
- }
- }
- }
-
- @Test
- @Ignore("OAK-2843")
- public void broadcastEncryptedUDP() throws Exception {
- try {
- broadcast("udp:group FF78:230::1234;key test;port 9876;sendTo
localhost;aes", 50);
- } catch (AssertionError e) {
- try {
- broadcast("udp:group 228.6.7.9;key test;port 9876;aes", 50);
- } catch (AssertionError e2) {
- throwBoth(e, e2);
- }
- }
- }
-
+
private static void throwBoth(AssertionError e, AssertionError e2) throws
AssertionError {
Throwable ex = e;
while (ex.getCause() != null) {
@@ -200,7 +89,7 @@ public class BroadcastTest {
}
broadcastTry(type, minPercentCorrect, false);
}
-
+
private static boolean broadcastTry(String type, int minPercentCorrect,
boolean tryOnly) throws Exception {
FileUtils.deleteDirectory(new File("target/broadcastTest"));
new File("target/broadcastTest").mkdirs();
@@ -239,7 +128,7 @@ public class BroadcastTest {
Assert.fail("min: " + minPercentCorrect + " got: " + correct);
return false;
}
-
+
private static boolean waitFor(Callable<Boolean> call, int
timeoutInMilliseconds) {
long start = System.currentTimeMillis();
while (true) {
@@ -261,7 +150,7 @@ public class BroadcastTest {
}
}
}
-
+
private static <K, V> boolean waitFor(final Cache<K, V> map, final K key,
final V value, int timeoutInMilliseconds) {
return waitFor(new Callable<Boolean>() {
@Override
@@ -274,7 +163,7 @@ public class BroadcastTest {
}
}, timeoutInMilliseconds);
}
-
+
private static <K, V> boolean waitFor(final Cache<K, V> map, final K key,
int timeoutInMilliseconds) {
return waitFor(new Callable<Boolean>() {
@Override
@@ -283,7 +172,7 @@ public class BroadcastTest {
}
}, timeoutInMilliseconds);
}
-
+
private static Cache<Key, StringValue> openCache(PersistentCache p) {
CacheLIRS<Key, StringValue> cache = new CacheLIRS.Builder<Key,
StringValue>().
maximumSize(1).build();