I can reproduce the bug, above log is the server(first) node print when I
stop other nodes .
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* @author wangsan
* @date 2018/08/23
*/
public class IgniteMultiThreadOnOffTest {
private static final String CACHE_DATA_REGION_PERSISTENCE =
"PERSISTENCE_REGION";
private static final String stringCacheOneName = "StringCacheOneName";
@Test
public void testFirstServer() throws Exception {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(false);
cfg.setDiscoverySpi(tcpDiscoverySpi());
cfg.setConsistentId("server_test_1");
cfg.setIgniteInstanceName("server_test_1");
cfg.setGridLogger(new Slf4jLogger());
Ignite ignite = Ignition.start(cfg);
CacheConfiguration<String, String> cacheOneConfiguration = new
CacheConfiguration<>(stringCacheOneName);
cacheOneConfiguration.setCacheMode(CacheMode.REPLICATED);
IgniteCache<String, String> cacheOne =
ignite.getOrCreateCache(cacheOneConfiguration);
ignite.events().localListen(event -> {
System.err.println("get event " + event.name() + " " + event);
if (event instanceof DiscoveryEvent) {
ClusterNode clusterNode = ((DiscoveryEvent)
event).eventNode();
String item = "event_" + clusterNode.consistentId();
System.err.println("------ oldest node process the message
: " + item);
switch (event.type()) {
case EventType.EVT_NODE_JOINED:
cacheOne.put(item, item);
//
System.err.println("------- do add async" + item);
//
ForkJoinPool.commonPool().execute(() -> cacheOne.put(item, item));
break;
case EventType.EVT_NODE_FAILED:
case EventType.EVT_NODE_LEFT:
// will block
cacheOne.remove(item);
//
System.err.println("------- do remove async " + item);
//
ForkJoinPool.commonPool().execute(() -> cacheOne.remove(item));
break;
default:
System.err.println("ignore discovery event:" +
event);
break;
}
return true;
} else {
return false;
}
}, EventType.EVTS_DISCOVERY);
while (true) {
TimeUnit.SECONDS.sleep(5);
System.err.println("-------- " + cacheOne.size());
System.err.println(ignite.cacheNames().stream().collect(Collectors.joining(",")));
}
}
@Test
public void testMultiServer() throws IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
CompletableFuture<ArrayList<Ignite>> allIgnite =
IntStream.range(0, 5)
.mapToObj(i -> CompletableFuture.supplyAsync(() ->
testServer(i), executorService))
.map(f -> f.thenApply(i -> Lists.newArrayList(i)))
.reduce((x, y) -> x.thenCombine(y, (a, b) -> {
a.addAll(b);
return a;
})).get();
allIgnite.thenAccept(list -> {
System.err.println("start use time ms " +
(System.currentTimeMillis() - start));
list.forEach(n -> System.err.println("n.id " +
n.cluster().localNode().consistentId()));
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("going to close ignite,see first server ");
list.forEach(n -> n.close());
}).join();
}
private Ignite testServer(int index) {
String name = "test_multi_server_index_" + index;
System.out.println(name);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(false);
cfg.setDiscoverySpi(tcpDiscoverySpi());
cfg.setGridLogger(new Slf4jLogger());
cfg.setIgniteInstanceName(name);
cfg.setConsistentId(name);
Ignite ignite = Ignition.start(cfg);
System.out.println("------- started " +
ignite.cluster().localNode().id());
IgniteCache<String, String> cacheOne =
ignite.getOrCreateCache(stringCacheOneName);
cacheOne.put(name, name);
System.out.println("cache size " + cacheOne.size());
return ignite;
}
private IgniteDiscoverySpi tcpDiscoverySpi() {
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryMulticastIpFinder ipFinder = new
TcpDiscoveryMulticastIpFinder();
ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
spi.setIpFinder(ipFinder);
return spi;
}
}
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/