Hi, Thanks a lot for reproducible scenario and for the test. I create the issue to track: https://issues.apache.org/jira/browse/IGNITE-4369.
On Wed, Nov 30, 2016 at 7:35 PM, vladiisy <[email protected]> wrote: > Hi, > > i have a problem with my application using reentrant locks, they seems to > going corrupt after one node leaves my cluster. The threads with locks > hanging on GridCacheLockImpl.unlock for ever: > > ">>JUnit-Test-Worker-0" #160 prio=5 os_prio=0 tid=0x0000000020d57800 > nid=0x21f4 runnable [0x000000002e28e000] > java.lang.Thread.State: RUNNABLE > at java.lang.Thread.yield(Native Method) > at > org.apache.ignite.internal.processors.datastructures. > GridCacheLockImpl$Sync.tryRelease(GridCacheLockImpl.java:469) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.release( > AbstractQueuedSynchronizer.java:1261) > at > org.apache.ignite.internal.processors.datastructures. > GridCacheLockImpl.unlock(GridCacheLockImpl.java:1296) > at > com.iisy.solvatio.core.cluster.ignite.IgniteTests$ > Worker.run(IgniteTests.java:156) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > .... > > i have a junit test IgniteTests.java > <http://apache-ignite-users.70518.x6.nabble.com/file/ > n9303/IgniteTests.java> > , it shows the problem. > > Is it a bug in ignite or my mistake? > > Great thanks in advance, > Vladimir > > Here my JUnit > ---------------------------------------------------- > package ignite.cluster.tests; > > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Collection; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.ThreadFactory; > import java.util.concurrent.TimeUnit; > > import org.apache.ignite.Ignite; > import org.apache.ignite.IgniteLock; > import org.apache.ignite.IgniteSystemProperties; > import org.apache.ignite.Ignition; > import org.apache.ignite.cluster.ClusterNode; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.events.EventType; > import org.apache.ignite.logger.slf4j.Slf4jLogger; > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; > import > org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; > import org.junit.After; > import org.junit.Assert; > import org.junit.Before; > import org.junit.Test; > > import com.google.common.util.concurrent.ThreadFactoryBuilder; > > public class IgniteTests { > > private Ignite[] clusterNodes; > > @Before > public void start() throws Exception { > int numNodes = 2; > this.clusterNodes = new Ignite[numNodes]; > for (int i = 0; i < numNodes; i++) { > this.clusterNodes[i] = startClusterNode(i); > } > boolean ready = waitClusterHasNodes(this.clusterNodes[0], > numNodes); > if (!ready) { > throw new IllegalStateException("Cluster not ready after 15 > seconds"); > } > } > > public boolean waitClusterHasNodes(final Ignite node, final int > numNodes) throws InterruptedException { > boolean ready = false; > for (int i = 0; i < 15; i++) { > Collection<ClusterNode> nodes0 = node.cluster().nodes(); > if (nodes0.size() == numNodes) { > ready = true; > break; > } > Thread.sleep(1000); > } > return ready; > } > > private Ignite startClusterNode(final int nodeIndex) throws Exception { > String nodeName = "node" + nodeIndex; > List<String> addresses = Arrays.asList("127.0.0.1:48500..48502"); > > System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, > Boolean.toString(false)); > IgniteConfiguration config = new IgniteConfiguration(); > > config.setClassLoader(this.getClass().getClassLoader()); > config.setPeerClassLoadingEnabled(false); > config.setGridLogger(new Slf4jLogger()); > > config.setGridName(nodeName); > config.setConsistentId(nodeName); > > config.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT); > > Map<String, Object> userAttributes = new HashMap<>(); > userAttributes.put("clusterName", "JUnit-Cluster"); > config.setUserAttributes(userAttributes); > > config.setMetricsLogFrequency(0); > > TcpDiscoverySpi spi = new TcpDiscoverySpi(); > TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); > ipFinder.setAddresses(addresses); > config.setDiscoverySpi(spi); > config.setClientMode(false); > > Ignite ignite = Ignition.start(config); > return ignite; > } > > @After > public void stop() { > if (this.clusterNodes != null) { > for (int i = 0; i < this.clusterNodes.length; i++) { > shutdown(i); > } > } > } > > /** > * @param casesCount > * @return > */ > protected ExecutorService createExecutor(final int casesCount) { > ThreadFactory factory = new > ThreadFactoryBuilder().setNameFormat(">>JUnit-Test-Worker-%d").build(); > return Executors.newFixedThreadPool(casesCount * 2, factory); > } > > private void shutdown(final int nodeNum) { > Ignite node = this.clusterNodes[nodeNum]; > if (node != null) { > boolean stoped = > Ignition.stop(this.clusterNodes[nodeNum].name(), false); > Assert.assertTrue(stoped); > > this.clusterNodes[nodeNum] = null; > } > } > > private static class Worker implements Runnable { > > private final int nr; > private final Ignite node; > private volatile boolean stoped = false; > private final CountDownLatch latch; > > public Worker(final int nr, final Ignite node, final CountDownLatch > latch) { > this.nr = nr; > this.node = node; > this.latch = latch; > } > > @Override > public void run() { > try { > while (!this.stoped) { > IgniteLock lock = this.node.reentrantLock("Lock_" + > this.nr, true, false, true); > > lock.lock(); > try { > Thread.sleep(2); > } catch (InterruptedException e) { > // ignore > } > lock.unlock(); > } > } catch (Throwable e) { > System.err.println("Error occured in worker " + this.nr + > " > on node " + this.node.name()); > e.printStackTrace(); > } > this.latch.countDown(); > } > > public void stop() { > this.stoped = true; > } > } > > @Test > public void testEvents() throws Exception { > > final Ignite node0 = this.clusterNodes[0]; > final Ignite node1 = this.clusterNodes[1]; > > final int workersPerNode = 100; > ExecutorService service = createExecutor(workersPerNode * 2); > try { > CountDownLatch latch0 = new CountDownLatch(workersPerNode); > List<Worker> workersNode0 = new ArrayList(workersPerNode); > > CountDownLatch latch1 = new CountDownLatch(workersPerNode); > List<Worker> workersNode1 = new ArrayList(workersPerNode); > for (int i = 0; i < workersPerNode; i++) { > { > Worker workerNode0 = new Worker(i, node0, latch0); > workersNode0.add(workerNode0); > service.submit(workerNode0); > } > > { > Worker workerNode1 = new Worker(i, node1, latch1); > workersNode1.add(workerNode1); > service.submit(workerNode1); > } > } > Thread.sleep(3000); > // stop worker on node1 > workersNode1.forEach(w -> w.stop()); > boolean downNormally = latch1.await(15, TimeUnit.SECONDS); > if (!downNormally) { > Assert.fail("Workers are not ready, missing:" + > latch1.getCount()); > } > System.out.println("Stopping node 1"); > shutdown(1); > System.out.println("Node 1 is down"); > > Thread.sleep(10000); > System.out.println("Stopping worker on node 0"); > workersNode0.forEach(w -> w.stop()); > > boolean normally = latch0.await(15, TimeUnit.SECONDS); > if (!normally) { > System.err.println("Problem occured, make thead dump..."); > Thread.sleep(60 * 1000); > Assert.fail("Workers are not ready, missing: " + > latch0.getCount()); > } > } finally { > service.shutdown(); > } > } > } > > > > > > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/Problem-with-ReentranLocks-on-shutdown-of-one-node-in- > cluster-tp9303.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. >
