Hi,

i have a problem with my application using reentrant locks, they seems to
going corrupt after one node leaves my cluster. The threads with locks
hanging on GridCacheLockImpl.unlock for ever:

">>JUnit-Test-Worker-0" #160 prio=5 os_prio=0 tid=0x0000000020d57800
nid=0x21f4 runnable [0x000000002e28e000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Thread.yield(Native Method)
        at
org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl$Sync.tryRelease(GridCacheLockImpl.java:469)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
        at
org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl.unlock(GridCacheLockImpl.java:1296)
        at
com.iisy.solvatio.core.cluster.ignite.IgniteTests$Worker.run(IgniteTests.java:156)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
....

i have a junit test IgniteTests.java
<http://apache-ignite-users.70518.x6.nabble.com/file/n9303/IgniteTests.java> 
, it shows the problem. 

Is it a bug in ignite or my mistake?

Great thanks in advance,
Vladimir

Here my JUnit
----------------------------------------------------
package ignite.cluster.tests;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class IgniteTests {

    private Ignite[] clusterNodes;

    @Before
    public void start() throws Exception {
        int numNodes = 2;
        this.clusterNodes = new Ignite[numNodes];
        for (int i = 0; i < numNodes; i++) {
            this.clusterNodes[i] = startClusterNode(i);
        }
        boolean ready = waitClusterHasNodes(this.clusterNodes[0], numNodes);
        if (!ready) {
            throw new IllegalStateException("Cluster not ready after 15
seconds");
        }
    }

    public boolean waitClusterHasNodes(final Ignite node, final int
numNodes) throws InterruptedException {
        boolean ready = false;
        for (int i = 0; i < 15; i++) {
            Collection<ClusterNode> nodes0 = node.cluster().nodes();
            if (nodes0.size() == numNodes) {
                ready = true;
                break;
            }
            Thread.sleep(1000);
        }
        return ready;
    }

    private Ignite startClusterNode(final int nodeIndex) throws Exception {
        String nodeName = "node" + nodeIndex;
        List<String> addresses = Arrays.asList("127.0.0.1:48500..48502");

        System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER,
Boolean.toString(false));
        IgniteConfiguration config = new IgniteConfiguration();

        config.setClassLoader(this.getClass().getClassLoader());
        config.setPeerClassLoadingEnabled(false);
        config.setGridLogger(new Slf4jLogger());

        config.setGridName(nodeName);
        config.setConsistentId(nodeName);

        config.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT);

        Map<String, Object> userAttributes = new HashMap<>();
        userAttributes.put("clusterName", "JUnit-Cluster");
        config.setUserAttributes(userAttributes);

        config.setMetricsLogFrequency(0);

        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
        ipFinder.setAddresses(addresses);
        config.setDiscoverySpi(spi);
        config.setClientMode(false);

        Ignite ignite = Ignition.start(config);
        return ignite;
    }

    @After
    public void stop() {
        if (this.clusterNodes != null) {
            for (int i = 0; i < this.clusterNodes.length; i++) {
                shutdown(i);
            }
        }
    }

    /**
     * @param casesCount
     * @return
     */
    protected ExecutorService createExecutor(final int casesCount) {
        ThreadFactory factory = new
ThreadFactoryBuilder().setNameFormat(">>JUnit-Test-Worker-%d").build();
        return Executors.newFixedThreadPool(casesCount * 2, factory);
    }

    private void shutdown(final int nodeNum) {
        Ignite node = this.clusterNodes[nodeNum];
        if (node != null) {
            boolean stoped =
Ignition.stop(this.clusterNodes[nodeNum].name(), false);
            Assert.assertTrue(stoped);

            this.clusterNodes[nodeNum] = null;
        }
    }

    private static class Worker implements Runnable {

        private final int nr;
        private final Ignite node;
        private volatile boolean stoped = false;
        private final CountDownLatch latch;

        public Worker(final int nr, final Ignite node, final CountDownLatch
latch) {
            this.nr = nr;
            this.node = node;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                while (!this.stoped) {
                    IgniteLock lock = this.node.reentrantLock("Lock_" +
this.nr, true, false, true);

                    lock.lock();
                    try {
                        Thread.sleep(2);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    lock.unlock();
                }
            } catch (Throwable e) {
                System.err.println("Error occured in worker " + this.nr + "
on node " + this.node.name());
                e.printStackTrace();
            }
            this.latch.countDown();
        }

        public void stop() {
            this.stoped = true;
        }
    }

    @Test
    public void testEvents() throws Exception {

        final Ignite node0 = this.clusterNodes[0];
        final Ignite node1 = this.clusterNodes[1];

        final int workersPerNode = 100;
        ExecutorService service = createExecutor(workersPerNode * 2);
        try {
            CountDownLatch latch0 = new CountDownLatch(workersPerNode);
            List<Worker> workersNode0 = new ArrayList(workersPerNode);

            CountDownLatch latch1 = new CountDownLatch(workersPerNode);
            List<Worker> workersNode1 = new ArrayList(workersPerNode);
            for (int i = 0; i < workersPerNode; i++) {
                {
                    Worker workerNode0 = new Worker(i, node0, latch0);
                    workersNode0.add(workerNode0);
                    service.submit(workerNode0);
                }

                {
                    Worker workerNode1 = new Worker(i, node1, latch1);
                    workersNode1.add(workerNode1);
                    service.submit(workerNode1);
                }
            }
            Thread.sleep(3000);
            // stop worker on node1
            workersNode1.forEach(w -> w.stop());
            boolean downNormally = latch1.await(15, TimeUnit.SECONDS);
            if (!downNormally) {
                Assert.fail("Workers are not ready, missing:" +
latch1.getCount());
            }
            System.out.println("Stopping node 1");
            shutdown(1);
            System.out.println("Node 1 is down");

            Thread.sleep(10000);
            System.out.println("Stopping worker on node 0");
            workersNode0.forEach(w -> w.stop());

            boolean normally = latch0.await(15, TimeUnit.SECONDS);
            if (!normally) {
                System.err.println("Problem occured, make thead dump...");
                Thread.sleep(60 * 1000);
                Assert.fail("Workers are not ready, missing: " +
latch0.getCount());
            }
        } finally {
            service.shutdown();
        }
    }
}








--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Problem-with-ReentranLocks-on-shutdown-of-one-node-in-cluster-tp9303.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to