Hello!

I'm fairly confident that you should not attempt to do any cache operations
from discovery threads, e.g. from event listeners.

When topology changes you can't expect any cache operations to be performed
before partition map exchange is done.

Regards,
-- 
Ilya Kasnacheev


ср, 29 авг. 2018 г. в 6:24, wangsan <wqp1...@gmail.com>:

> I can reproduce the bug,  above log is the server(first) node print when I
> stop other nodes .
>
>
>
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
>
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.events.DiscoveryEvent;
> import org.apache.ignite.events.EventType;
> import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
> import org.apache.ignite.logger.slf4j.Slf4jLogger;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import
>
> org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
> import org.junit.Test;
>
> import com.google.common.collect.Lists;
>
> /**
>  * @author wangsan
>  * @date 2018/08/23
>  */
> public class IgniteMultiThreadOnOffTest {
>     private static final String CACHE_DATA_REGION_PERSISTENCE =
> "PERSISTENCE_REGION";
>     private static final String stringCacheOneName = "StringCacheOneName";
>
>     @Test
>     public void testFirstServer() throws Exception {
>         IgniteConfiguration cfg = new IgniteConfiguration();
>         cfg.setClientMode(false);
>         cfg.setDiscoverySpi(tcpDiscoverySpi());
>         cfg.setConsistentId("server_test_1");
>         cfg.setIgniteInstanceName("server_test_1");
>         cfg.setGridLogger(new Slf4jLogger());
>         Ignite ignite = Ignition.start(cfg);
>
>         CacheConfiguration<String, String> cacheOneConfiguration = new
> CacheConfiguration<>(stringCacheOneName);
>         cacheOneConfiguration.setCacheMode(CacheMode.REPLICATED);
>         IgniteCache<String, String> cacheOne =
> ignite.getOrCreateCache(cacheOneConfiguration);
>
>         ignite.events().localListen(event -> {
>             System.err.println("get event " + event.name() + " " + event);
>             if (event instanceof DiscoveryEvent) {
>                 ClusterNode clusterNode = ((DiscoveryEvent)
> event).eventNode();
>
>                 String item = "event_" + clusterNode.consistentId();
>                 System.err.println("------  oldest node process the message
> : " + item);
>
>                 switch (event.type()) {
>                     case EventType.EVT_NODE_JOINED:
>                         cacheOne.put(item, item);
>                         //
> System.err.println("------- do add async" + item);
>                         //
> ForkJoinPool.commonPool().execute(() -> cacheOne.put(item, item));
>                         break;
>                     case EventType.EVT_NODE_FAILED:
>                     case EventType.EVT_NODE_LEFT:
>                         // will block
>                         cacheOne.remove(item);
>                         //
> System.err.println("------- do remove async " + item);
>                         //
> ForkJoinPool.commonPool().execute(() -> cacheOne.remove(item));
>                         break;
>                     default:
>                         System.err.println("ignore discovery event:" +
> event);
>                         break;
>                 }
>                 return true;
>             } else {
>                 return false;
>             }
>         }, EventType.EVTS_DISCOVERY);
>
>         while (true) {
>             TimeUnit.SECONDS.sleep(5);
>
>             System.err.println("-------- " + cacheOne.size());
>
>
> System.err.println(ignite.cacheNames().stream().collect(Collectors.joining(",")));
>         }
>     }
>
>     @Test
>     public void testMultiServer() throws IOException {
>         ExecutorService executorService = Executors.newCachedThreadPool();
>         long start = System.currentTimeMillis();
>         CompletableFuture<ArrayList&lt;Ignite>> allIgnite =
> IntStream.range(0, 5)
>                 .mapToObj(i -> CompletableFuture.supplyAsync(() ->
> testServer(i), executorService))
>                 .map(f -> f.thenApply(i -> Lists.newArrayList(i)))
>                 .reduce((x, y) -> x.thenCombine(y, (a, b) -> {
>                     a.addAll(b);
>                     return a;
>                 })).get();
>
>         allIgnite.thenAccept(list -> {
>             System.err.println("start use time ms " +
> (System.currentTimeMillis() - start));
>             list.forEach(n -> System.err.println("n.id " +
> n.cluster().localNode().consistentId()));
>             try {
>                 TimeUnit.SECONDS.sleep(20);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             System.err.println("going to close ignite,see first server ");
>             list.forEach(n -> n.close());
>         }).join();
>     }
>
>     private Ignite testServer(int index) {
>         String name = "test_multi_server_index_" + index;
>         System.out.println(name);
>
>         IgniteConfiguration cfg = new IgniteConfiguration();
>         cfg.setClientMode(false);
>         cfg.setDiscoverySpi(tcpDiscoverySpi());
>         cfg.setGridLogger(new Slf4jLogger());
>         cfg.setIgniteInstanceName(name);
>         cfg.setConsistentId(name);
>
>         Ignite ignite = Ignition.start(cfg);
>         System.out.println("------- started " +
> ignite.cluster().localNode().id());
>         IgniteCache<String, String> cacheOne =
> ignite.getOrCreateCache(stringCacheOneName);
>         cacheOne.put(name, name);
>         System.out.println("cache size " + cacheOne.size());
>         return ignite;
>     }
>
>     private IgniteDiscoverySpi tcpDiscoverySpi() {
>         TcpDiscoverySpi spi = new TcpDiscoverySpi();
>         TcpDiscoveryMulticastIpFinder ipFinder = new
> TcpDiscoveryMulticastIpFinder();
>         ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
>         spi.setIpFinder(ipFinder);
>         return spi;
>     }
>
> }
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Reply via email to