I can reproduce the bug,  above log is the server(first) node print when I
stop other nodes .



import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.junit.Test;

import com.google.common.collect.Lists;

/**
 * @author wangsan
 * @date 2018/08/23
 */
public class IgniteMultiThreadOnOffTest {
    private static final String CACHE_DATA_REGION_PERSISTENCE =
"PERSISTENCE_REGION";
    private static final String stringCacheOneName = "StringCacheOneName";

    @Test
    public void testFirstServer() throws Exception {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(false);
        cfg.setDiscoverySpi(tcpDiscoverySpi());
        cfg.setConsistentId("server_test_1");
        cfg.setIgniteInstanceName("server_test_1");
        cfg.setGridLogger(new Slf4jLogger());
        Ignite ignite = Ignition.start(cfg);

        CacheConfiguration<String, String> cacheOneConfiguration = new
CacheConfiguration<>(stringCacheOneName);
        cacheOneConfiguration.setCacheMode(CacheMode.REPLICATED);
        IgniteCache<String, String> cacheOne =
ignite.getOrCreateCache(cacheOneConfiguration);

        ignite.events().localListen(event -> {
            System.err.println("get event " + event.name() + " " + event);
            if (event instanceof DiscoveryEvent) {
                ClusterNode clusterNode = ((DiscoveryEvent)
event).eventNode();

                String item = "event_" + clusterNode.consistentId();
                System.err.println("------  oldest node process the message
: " + item);

                switch (event.type()) {
                    case EventType.EVT_NODE_JOINED:
                        cacheOne.put(item, item);
                        //                       
System.err.println("------- do add async" + item);
                        //                       
ForkJoinPool.commonPool().execute(() -> cacheOne.put(item, item));
                        break;
                    case EventType.EVT_NODE_FAILED:
                    case EventType.EVT_NODE_LEFT:
                        // will block
                        cacheOne.remove(item);
                        //                       
System.err.println("------- do remove async " + item);
                        //                       
ForkJoinPool.commonPool().execute(() -> cacheOne.remove(item));
                        break;
                    default:
                        System.err.println("ignore discovery event:" +
event);
                        break;
                }
                return true;
            } else {
                return false;
            }
        }, EventType.EVTS_DISCOVERY);

        while (true) {
            TimeUnit.SECONDS.sleep(5);

            System.err.println("-------- " + cacheOne.size());
           
System.err.println(ignite.cacheNames().stream().collect(Collectors.joining(",")));
        }
    }

    @Test
    public void testMultiServer() throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        CompletableFuture<ArrayList&lt;Ignite>> allIgnite =
IntStream.range(0, 5)
                .mapToObj(i -> CompletableFuture.supplyAsync(() ->
testServer(i), executorService))
                .map(f -> f.thenApply(i -> Lists.newArrayList(i)))
                .reduce((x, y) -> x.thenCombine(y, (a, b) -> {
                    a.addAll(b);
                    return a;
                })).get();

        allIgnite.thenAccept(list -> {
            System.err.println("start use time ms " +
(System.currentTimeMillis() - start));
            list.forEach(n -> System.err.println("n.id " +
n.cluster().localNode().consistentId()));
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.err.println("going to close ignite,see first server ");
            list.forEach(n -> n.close());
        }).join();
    }

    private Ignite testServer(int index) {
        String name = "test_multi_server_index_" + index;
        System.out.println(name);

        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(false);
        cfg.setDiscoverySpi(tcpDiscoverySpi());
        cfg.setGridLogger(new Slf4jLogger());
        cfg.setIgniteInstanceName(name);
        cfg.setConsistentId(name);

        Ignite ignite = Ignition.start(cfg);
        System.out.println("------- started " +
ignite.cluster().localNode().id());
        IgniteCache<String, String> cacheOne =
ignite.getOrCreateCache(stringCacheOneName);
        cacheOne.put(name, name);
        System.out.println("cache size " + cacheOne.size());
        return ignite;
    }

    private IgniteDiscoverySpi tcpDiscoverySpi() {
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        TcpDiscoveryMulticastIpFinder ipFinder = new
TcpDiscoveryMulticastIpFinder();
        ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
        spi.setIpFinder(ipFinder);
        return spi;
    }

}





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to