This is an automated email from the ASF dual-hosted git repository.

reschke pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/1.22 by this push:
     new e21c928927 OAK-11419: oak-store-document - persistent cache: remove 
unused UDP/TCP broadcasters
e21c928927 is described below

commit e21c9289279bd02f642c3fcd77d911ff6ed5b3ce
Author: Julian Reschke <[email protected]>
AuthorDate: Thu Mar 20 13:27:47 2025 +0100

    OAK-11419: oak-store-document - persistent cache: remove unused UDP/TCP 
broadcasters
---
 .../document/persistentCache/PersistentCache.java  |   8 -
 .../persistentCache/broadcast/TCPBroadcaster.java  | 453 ---------------------
 .../persistentCache/broadcast/UDPBroadcaster.java  | 260 ------------
 .../document/persistentCache/BroadcastTest.java    | 125 +-----
 4 files changed, 7 insertions(+), 839 deletions(-)

diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
index a6227debb1..95620c183c 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
@@ -33,8 +33,6 @@ import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Dyna
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.h2.mvstore.FileStore;
 import org.h2.mvstore.MVMap;
@@ -229,12 +227,6 @@ public class PersistentCache implements 
Broadcaster.Listener {
             return;
         } else if (broadcast.equals("inMemory")) {
             broadcaster = InMemoryBroadcaster.INSTANCE;
-        } else if (broadcast.startsWith("udp:")) {
-            String config = broadcast.substring("udp:".length(), 
broadcast.length());
-            broadcaster = new UDPBroadcaster(config);
-        } else if (broadcast.startsWith("tcp:")) {
-            String config = broadcast.substring("tcp:".length(), 
broadcast.length());
-            broadcaster = new TCPBroadcaster(config);
         } else {
             throw new IllegalArgumentException("Unknown broadcaster type " + 
broadcast);
         }
diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
deleted file mode 100644
index 283f0e7817..0000000000
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.security.MessageDigest;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A broadcast mechanism that uses TCP. It is mainly used for testing.
- */
-public class TCPBroadcaster implements Broadcaster {
-
-    static final Logger LOG = LoggerFactory.getLogger(TCPBroadcaster.class);
-    private static final int TIMEOUT = 100;
-    private static final int MAX_BUFFER_SIZE = 64;
-    private static final AtomicInteger NEXT_ID = new AtomicInteger();
-    private static final Charset UTF8 = Charset.forName("UTF-8");
-
-    private final int id = NEXT_ID.incrementAndGet();
-
-    private final CopyOnWriteArrayList<Listener> listeners = new 
CopyOnWriteArrayList<Listener>();
-    private final ConcurrentHashMap<String, Client> clients = new 
ConcurrentHashMap<String, Client>();
-    private final ArrayBlockingQueue<ByteBuffer> sendBuffer = new 
ArrayBlockingQueue<ByteBuffer>(MAX_BUFFER_SIZE * 2);
-
-    private volatile DynamicBroadcastConfig broadcastConfig;
-    private ServerSocket serverSocket;
-    private Thread acceptThread;
-    private Thread discoverThread;
-    private Thread sendThread;
-    private String ownListener;
-    private String ownKeyUUID = UUID.randomUUID().toString();
-    private byte[] ownKey = ownKeyUUID.getBytes(UTF8);
-    
-    private final AtomicBoolean stop = new AtomicBoolean(false);
-    
-    public TCPBroadcaster(String config) {
-        LOG.info("Init " + config);
-        init(config);
-    }
-    
-    public void init(String config) {
-        try {
-            String[] parts = config.split(";");
-            int startPort = 9800;
-            int endPort = 9810;
-            String key = "";
-            
-            // for debugging, this will send everything to localhost:
-            // String[] sendTo = {"sendTo", "localhost"};
-            
-            // by default, only the entries in the clusterNodes 
-            // collection are used:
-            String[] sendTo = {"sendTo"};
-            
-            for (String p : parts) {
-                if (p.startsWith("ports ")) {
-                    String[] ports = p.split(" ");
-                    startPort = Integer.parseInt(ports[1]);
-                    endPort = Integer.parseInt(ports[2]);
-                } else if (p.startsWith("key ")) {
-                    key = p.split(" ")[1];
-                } else if (p.startsWith("sendTo ")) {
-                    sendTo = p.split(" ");
-                }
-            }                    
-            sendTo[0] = null;
-            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-            if (key.length() > 0) {
-                ownKey = messageDigest.digest(key.getBytes(UTF8));
-            }
-            IOException lastException = null;
-            ServerSocket server = null;
-            for (int port = startPort; port <= endPort; port++) {
-                if (server == null) {
-                    try {
-                        server = new ServerSocket(port);
-                    } catch (IOException e) {
-                        LOG.debug("Cannot open port " + port);
-                        lastException = e;
-                        // ignore
-                    }
-                }
-                for (String send : sendTo) {
-                    if (send != null && !send.isEmpty()) {
-                        try {
-                            Client c = new Client(send, port, ownKey);
-                            clients.put(send + ":" + port, c);
-                        } catch (IOException e) {
-                            LOG.debug("Cannot connect to " + send + " " + 
port);
-                            // ignore
-                        }                        
-                    }
-                }
-            }
-            if (server == null && lastException != null) {
-                throw lastException;
-            }
-            server.setSoTimeout(TIMEOUT);
-            serverSocket = server;
-            LOG.info("Listening on port " + server.getLocalPort());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        
-        acceptThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                accept();
-            }
-        }, "Oak TCPBroadcaster: accept #" + id);
-        acceptThread.setDaemon(true);
-        acceptThread.start();
-        
-        discoverThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                discover();
-            }
-        }, "Oak TCPBroadcaster: discover #" + id);
-        discoverThread.setDaemon(true);
-        discoverThread.start();
-        
-        sendThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                send();
-            }
-        }, "Oak TCPBroadcaster: send #" + id);
-        sendThread.setDaemon(true);
-        sendThread.start();
-        
-    }
-    
-    @Override
-    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
-        this.broadcastConfig = broadcastConfig;
-        HashMap<String, String> clientInfo = new HashMap<String, String>();
-        clientInfo.put(DynamicBroadcastConfig.ID, ownKeyUUID);
-        ServerSocket s = serverSocket;
-        if (s != null) {
-            String address = getLocalAddress();
-            if (address != null) {
-                ownListener = address + ":" + s.getLocalPort();
-                clientInfo.put(DynamicBroadcastConfig.LISTENER, ownListener);
-            }
-        }
-        broadcastConfig.connect(clientInfo);
-    }
-    
-    static String getLocalAddress() {
-        String bind = System.getProperty("oak.tcpBindAddress", null);
-        try {
-            InetAddress address;
-            if (bind != null && !bind.isEmpty()) {
-                address = InetAddress.getByName(bind);
-            } else {
-                address = InetAddress.getLocalHost();
-            }
-            return address.getHostAddress();
-        } catch (UnknownHostException e) {
-            return "";
-        }
-    }
-    
-    void accept() {
-        while (isRunning()) {
-            try {
-                final Socket socket = serverSocket.accept();
-                Runnable r = new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            final DataInputStream in = new 
DataInputStream(socket.getInputStream());
-                            byte[] testKey = new byte[ownKey.length];
-                            in.readFully(testKey);
-                            if 
(ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(ownKey)) != 0) {
-                                LOG.debug("Key mismatch");
-                                socket.close();
-                                return;
-                            }
-                            while (!socket.isClosed()) {
-                                int len = in.readInt();
-                                byte[] data = new byte[len];
-                                in.readFully(data);
-                                ByteBuffer buff = ByteBuffer.wrap(data);
-                                int start = buff.position();
-                                for (Listener l : listeners) {
-                                    ((Buffer)buff).position(start);
-                                    l.receive(buff);
-                                }
-                            }
-                        } catch (IOException e) {
-                            // ignore
-                        }
-                    }
-                };
-                Thread t = new Thread(r, "Oak TCPBroadcaster: listener");
-                t.setDaemon(true);
-                t.start();
-            } catch (SocketTimeoutException e) {
-                // ignore
-            } catch (IOException e) {
-                if (isRunning()) {
-                    LOG.warn("Receive failed", e);
-                }
-                // ignore
-            }                   
-        }
-        try {        
-            serverSocket.close();
-        } catch (IOException e) {
-            LOG.debug("Closed");
-            // ignore
-        }
-    }
-    
-    void discover() {
-        while (isRunning()) {
-            DynamicBroadcastConfig b = broadcastConfig;
-            if (b != null) {
-                readClients(b);
-            }
-            for (Client c : clients.values()) {
-                c.tryConnect();
-                if (!isRunning()) {
-                    break;
-                }
-            }
-            synchronized (stop) {
-                if (isRunning()) {
-                    try {
-                        stop.wait(2000);
-                    } catch (InterruptedException e) {
-                        // ignore
-                    }
-                }
-            }
-        }
-    }
-    
-    void readClients(DynamicBroadcastConfig b) {
-        List<Map<String, String>> list = b.getClientInfo();
-        for(Map<String, String> m : list) {
-            String listener = m.get(DynamicBroadcastConfig.LISTENER);
-            String id = m.get(DynamicBroadcastConfig.ID);
-            if (listener.equals(ownListener)) {
-                continue;
-            }
-            // the key is the combination of listener and id,
-            // because the same ip address / port combination
-            // could be there multiple times for some time
-            // (in case there is a old, orphan entry for the same machine)
-            String clientKey = listener + " " + id;
-            Client c = clients.get(clientKey);
-            if (c == null) {
-                int index = listener.lastIndexOf(':');
-                if (index >= 0) {
-                    String host = listener.substring(0, index);
-                    int port = Integer.parseInt(listener.substring(index + 1));
-                    try {
-                        byte[] key = id.getBytes(UTF8);
-                        c = new Client(host, port, key);
-                        clients.put(clientKey, c);
-                    } catch (UnknownHostException e) {
-                        // ignore
-                    }
-                }
-            }
-        }
-    }
-    
-    void send() {
-        while (isRunning()) {
-            try {
-                ByteBuffer buff = sendBuffer.poll(10, TimeUnit.MILLISECONDS);
-                if (buff != null && isRunning()) {
-                    sendBuffer(buff);
-                }
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-    
-    @Override
-    public void send(ByteBuffer buff) {
-        ByteBuffer b = ByteBuffer.allocate(buff.remaining());
-        b.put(buff);
-        ((Buffer)b).flip();
-        while (sendBuffer.size() > MAX_BUFFER_SIZE) {
-            sendBuffer.poll();
-        }
-        try {
-            sendBuffer.add(b);
-        } catch (IllegalStateException e) {
-            // ignore - might happen once in a while,
-            // if the buffer was not yet full just before, but now
-            // many threads concurrently tried to add
-        }
-    }
-    
-    private void sendBuffer(ByteBuffer buff) {
-        int len = buff.limit();
-        byte[] data = new byte[len];
-        buff.get(data);
-        for (Client c : clients.values()) {
-            c.send(data);
-            if (!isRunning()) {
-                break;
-            }
-        }
-    }
-
-    @Override
-    public void addListener(Listener listener) {
-        listeners.add(listener);
-    }
-
-    @Override
-    public void removeListener(Listener listener) {
-        listeners.remove(listener);
-    }
-
-    @Override
-    public void close() {
-        if (isRunning()) {
-            LOG.debug("Stopping");
-            synchronized (stop) {
-                stop.set(true);
-                stop.notifyAll();
-            }
-            try {
-                serverSocket.close();
-            } catch (IOException e) {
-                // ignore
-            }
-            try {
-                acceptThread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-            try {
-                sendThread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-            try {
-                discoverThread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public final boolean isRunning() {
-        return !stop.get();
-    }
-    
-    static class Client {
-        final String host;
-        final int port;
-        final byte[] key;
-        DataOutputStream out;
-        Client(String host, int port, byte[] key) throws UnknownHostException {
-            this.host = host;
-            this.port = port;
-            this.key = key;
-        }
-        void send(byte[] data) {
-            DataOutputStream o = out;
-            if (o != null) {
-                synchronized (o) {
-                    try {
-                        o.writeInt(data.length);
-                        o.write(data);
-                        o.flush();
-                    } catch (IOException e) {
-                        LOG.debug("Writing failed, port " + port, e);
-                        try {
-                            o.close();
-                        } catch (IOException e1) {
-                            // ignore
-                        }
-                        out = null;
-                    }
-                }
-            }
-        }
-        void tryConnect() {
-            DataOutputStream o = out;
-            if (o != null || host == null) {
-                return;
-            }
-            InetAddress address;
-            try {
-                address = InetAddress.getByName(host);
-            } catch (UnknownHostException e1) {
-                return;
-            }
-            Socket socket = new Socket();
-            try {
-                socket.connect(new InetSocketAddress(address, port), TIMEOUT);
-                o = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
-                o.write(key);
-                o.flush();
-                out = o;
-                LOG.info("Connected to " + address + " port " + port + " k " + 
key[0]);
-            } catch (IOException e) {
-                // ok, done
-            }
-        }
-    }
-
-}
diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
deleted file mode 100644
index 1ef843df18..0000000000
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/UDPBroadcaster.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-
-import javax.crypto.Cipher;
-import javax.crypto.CipherInputStream;
-import javax.crypto.CipherOutputStream;
-import javax.crypto.KeyGenerator;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.IvParameterSpec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A broadcast mechanism that uses UDP. It is mainly used for testing.
- */
-public class UDPBroadcaster implements Broadcaster, Runnable {
-
-    static final Logger LOG = LoggerFactory.getLogger(UDPBroadcaster.class);
-
-    private final byte[] key;
-    private final ArrayList<Listener> listeners = new ArrayList<Listener>();
-    private final MulticastSocket socket;
-    private final int port;
-    private final InetAddress group;
-    private final InetAddress sendTo;
-    private final Thread thread;
-    private final int messageLength = 32 * 1024;
-    private final Cipher encryptCipher;
-    private final Cipher decryptCipher;
-    private volatile boolean stop;
-    
-    public UDPBroadcaster(String config) {
-        LOG.info("init " + config);
-        MessageDigest messageDigest;
-        try {
-            String[] parts = config.split(";");
-            String group = "FF78:230::1234";
-            int port = 9876;
-            String key = "";
-            boolean aes = false;
-            String sendTo = null;
-            for (String p : parts) {
-                if (p.startsWith("port ")) {
-                    port = Integer.parseInt(p.split(" ")[1]);
-                } else if (p.startsWith("group ")) {
-                    group = p.split(" ")[1];
-                } else if (p.startsWith("key ")) {
-                    key = p.split(" ")[1];
-                } else if (p.equals("aes")) {
-                    aes = true;
-                } else if (p.startsWith("sendTo ")) {
-                    sendTo = p.split(" ")[1];
-                }
-            }                    
-            messageDigest = MessageDigest.getInstance("SHA-256");
-            this.key = messageDigest.digest(key.getBytes());
-            if (aes) {
-                KeyGenerator kgen = KeyGenerator.getInstance("AES");
-                kgen.init(128);
-                SecretKey aesKey = kgen.generateKey();  
-                encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                encryptCipher.init(Cipher.ENCRYPT_MODE, aesKey);
-                decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                IvParameterSpec ivParameterSpec = new 
IvParameterSpec(aesKey.getEncoded());
-                decryptCipher.init(Cipher.DECRYPT_MODE, aesKey, 
ivParameterSpec);
-            } else {
-                encryptCipher = null;
-                decryptCipher = null;
-            }
-            socket = new MulticastSocket(port);
-            this.group = InetAddress.getByName(group);
-            this.sendTo = sendTo == null ? this.group : 
InetAddress.getByName(sendTo);
-            this.port = port;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        thread = new Thread(this, "Oak UDPBroadcaster listener");
-        thread.setDaemon(true);
-        thread.start();
-    }
-    
-    @Override
-    public void run() {
-        byte[] receiveData = new byte[messageLength];             
-        try {
-            socket.joinGroup(group);
-        } catch (IOException e) {
-            if (!stop) {
-                LOG.warn("join group failed", e);
-            }
-            stop = true;
-            return;
-        }              
-        while (!stop) {
-            try {
-                DatagramPacket receivePacket = new DatagramPacket(
-                        receiveData, receiveData.length);                   
-                socket.receive(receivePacket);
-                int len = receivePacket.getLength();
-                if (len < key.length) {
-                    LOG.debug("too short");
-                    continue;
-                }
-                if (!checkKey(receiveData)) {
-                    LOG.debug("key mismatch");
-                    continue;
-                }
-                ByteBuffer buff = ByteBuffer.wrap(receiveData);
-                ((Buffer)buff).limit(len);
-                int start = key.length;
-                for (Listener l : listeners) {
-                    ((Buffer)buff).position(start);
-                    l.receive(buff);
-                }
-            } catch (IOException e) {
-                if (!stop) {
-                    LOG.warn("receive failed", e);
-                }
-                // ignore
-            }                   
-        }
-        try {        
-            socket.leaveGroup(group);
-            socket.close();
-        } catch (IOException e) {
-            if (!stop) {
-                LOG.warn("leave group failed", e);
-            }
-        }
-    }
-    
-    private boolean checkKey(byte[] data) {
-        for (int i = 0; i < key.length; i++) {
-            if (key[i] != data[i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-    
-    @Override
-    public void send(ByteBuffer buff) {
-        int len = key.length + buff.limit();
-        if (len >= messageLength) {
-            // message too long: ignore
-            return;
-        }
-        try {
-            byte[] sendData = new byte[len];
-            System.arraycopy(key, 0, sendData, 0, key.length);
-            buff.get(sendData, key.length, buff.limit());
-            DatagramPacket sendPacket = new DatagramPacket(sendData,
-                sendData.length, sendTo, port);
-            socket.send(sendPacket);
-        } catch (IOException e) {
-            if (!stop) {
-                LOG.debug("send failed", e);
-            }
-            // ignore
-        }
-    }
-
-    @Override
-    public void addListener(Listener listener) {
-        listeners.add(listener);
-    }
-
-    @Override
-    public void removeListener(Listener listener) {
-        listeners.remove(listener);
-    }
-    
-    byte[] encrypt(byte[] data) {
-        if (encryptCipher == null) {
-            return data;
-        }
-        try {
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            CipherOutputStream cipherOutputStream = new 
CipherOutputStream(outputStream, encryptCipher);
-            cipherOutputStream.write(data);
-            cipherOutputStream.flush();
-            cipherOutputStream.close();
-            byte[] encryptedBytes = outputStream.toByteArray();
-            return encryptedBytes;
-        } catch (IOException e) {
-            LOG.debug("encrypt failed", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    byte[] decrypt(byte[] data) {
-        if (decryptCipher == null) {
-            return data;
-        }
-        try {        
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            ByteArrayInputStream inStream = new ByteArrayInputStream(data);
-            CipherInputStream cipherInputStream = new 
CipherInputStream(inStream, decryptCipher);
-            byte[] buf = new byte[1024];
-            int bytesRead;
-            while ((bytesRead = cipherInputStream.read(buf)) >= 0) {
-                outputStream.write(buf, 0, bytesRead);
-            }
-            return outputStream.toByteArray();
-        } catch (IOException e) {
-            LOG.debug("decrypt failed", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        if (!stop) {
-            this.stop = true;
-            socket.close();
-            try {
-                thread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public boolean isRunning() {
-        return !stop;
-    }
-
-    @Override
-    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
-        // not yet implemented
-    }
-
-}
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
index 48df5e991c..15227d0cd7 100644
--- 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java
@@ -22,10 +22,7 @@ import static org.junit.Assert.assertNull;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.io.FileUtils;
@@ -33,29 +30,15 @@ import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.plugins.document.MemoryDiffCache.Key;
 import org.apache.jackrabbit.oak.plugins.document.Path;
 import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.PatternLayout;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.ConsoleAppender;
 
 import com.google.common.cache.Cache;
 
 public class BroadcastTest {
-    
-    public static void main(String... args) throws Exception {
-        listen();
-        benchmark();
-    }
-    
+
     private static void benchmark() throws IOException {
         FileUtils.deleteDirectory(new File("target/broadcastTest"));
         new File("target/broadcastTest").mkdirs();     
@@ -82,106 +65,12 @@ public class BroadcastTest {
             c.close();
         }
     }
-    
-    private static void listen() throws InterruptedException {
-        String config = "key 123";
-
-        
-        ConsoleAppender<ILoggingEvent> ca = new 
ConsoleAppender<ILoggingEvent>();
-        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
-        ca.setContext(lc);
-        PatternLayout pl = new PatternLayout();
-        pl.setPattern("%msg%n");
-        pl.setContext(lc);
-        pl.start();
-        ca.setLayout(pl);
-        ca.start();
-        
-        ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) 
LoggerFactory.getLogger(TCPBroadcaster.class);
-        logger.addAppender(ca);
-        logger.setLevel(Level.DEBUG);
-        
-        TCPBroadcaster receiver = new TCPBroadcaster(config);
-        receiver.addListener(new Broadcaster.Listener() {
-
-            @Override
-            public void receive(ByteBuffer buff) {
-                int end = buff.position();
-                StringBuilder sb = new StringBuilder();
-                while (buff.remaining() > 0) {
-                    char c = (char) buff.get();
-                    if (c >= ' ' && c < 128) {
-                        sb.append(c);
-                    } else if (c <= 9) {
-                        sb.append((char) ('0' + c));
-                    } else {
-                        sb.append('.');
-                    }
-                }
-                String dateTime = new 
Timestamp(System.currentTimeMillis()).toString().substring(0, 19);
-                System.out.println(dateTime + " Received " + sb);
-                buff.position(end);
-            }
-            
-        });
-        Random r = new Random();
-        int x = r.nextInt();
-        System.out.println("Sending " + x);
-        for (int i = 0; i < 10; i++) {
-            Thread.sleep(10);
-            ByteBuffer buff = ByteBuffer.allocate(1024);
-            buff.putInt(0);
-            buff.putInt(x);
-            buff.put(new byte[100]);
-            buff.flip();
-            receiver.send(buff);
-            if (!receiver.isRunning()) {
-                System.out.println("Did not start or already stopped");
-                break;
-            }
-        }
-        Thread.sleep(Integer.MAX_VALUE);
-    }
-    
-    @Test
-    public void broadcastTCP() throws Exception {
-        broadcast("tcp:sendTo localhost;key 123", 80);
-    }
 
     @Test
     public void broadcastInMemory() throws Exception {
         broadcast("inMemory", 100);
     }
-    
-    @Test
-    @Ignore("OAK-2843")
-    public void broadcastUDP() throws Exception {
-        try {
-            broadcast("udp:sendTo localhost", 50);
-        } catch (AssertionError e) {
-            // IPv6 didn't work, so try with IPv4
-            try {
-                broadcast("udp:group 228.6.7.9", 50);                
-            } catch (AssertionError e2) {
-                throwBoth(e, e2);
-            }                
-        }
-    }
-    
-    @Test
-    @Ignore("OAK-2843")
-    public void broadcastEncryptedUDP() throws Exception {
-        try {
-            broadcast("udp:group FF78:230::1234;key test;port 9876;sendTo 
localhost;aes", 50);
-        } catch (AssertionError e) {
-            try {
-                broadcast("udp:group 228.6.7.9;key test;port 9876;aes", 50);   
             
-            } catch (AssertionError e2) {
-                throwBoth(e, e2);
-            }                
-        }
-    }
-    
+
     private static void throwBoth(AssertionError e, AssertionError e2) throws 
AssertionError {
         Throwable ex = e;
         while (ex.getCause() != null) {
@@ -200,7 +89,7 @@ public class BroadcastTest {
         }
         broadcastTry(type, minPercentCorrect, false);
     }
-    
+
     private static boolean broadcastTry(String type, int minPercentCorrect, 
boolean tryOnly) throws Exception {
         FileUtils.deleteDirectory(new File("target/broadcastTest"));
         new File("target/broadcastTest").mkdirs();        
@@ -239,7 +128,7 @@ public class BroadcastTest {
         Assert.fail("min: " + minPercentCorrect + " got: " + correct);
         return false;
     }
-    
+
     private static boolean waitFor(Callable<Boolean> call, int 
timeoutInMilliseconds) {
         long start = System.currentTimeMillis();
         while (true) {
@@ -261,7 +150,7 @@ public class BroadcastTest {
             }
         }
     }
-    
+
     private static <K, V> boolean waitFor(final Cache<K, V> map, final K key, 
final V value, int timeoutInMilliseconds) {
         return waitFor(new Callable<Boolean>() {
             @Override
@@ -274,7 +163,7 @@ public class BroadcastTest {
             }
         }, timeoutInMilliseconds);
     }
-    
+
     private static <K, V> boolean waitFor(final Cache<K, V> map, final K key, 
int timeoutInMilliseconds) {
         return waitFor(new Callable<Boolean>() {
             @Override
@@ -283,7 +172,7 @@ public class BroadcastTest {
             }
         }, timeoutInMilliseconds);
     }
-    
+
     private static Cache<Key, StringValue> openCache(PersistentCache p) {
         CacheLIRS<Key, StringValue> cache = new CacheLIRS.Builder<Key, 
StringValue>().
                 maximumSize(1).build();


Reply via email to