Hi,
i have a problem with my application using reentrant locks, they seems to
going corrupt after one node leaves my cluster. The threads with locks
hanging on GridCacheLockImpl.unlock for ever:
">>JUnit-Test-Worker-0" #160 prio=5 os_prio=0 tid=0x0000000020d57800
nid=0x21f4 runnable [0x000000002e28e000]
java.lang.Thread.State: RUNNABLE
at java.lang.Thread.yield(Native Method)
at
org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl$Sync.tryRelease(GridCacheLockImpl.java:469)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at
org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl.unlock(GridCacheLockImpl.java:1296)
at
com.iisy.solvatio.core.cluster.ignite.IgniteTests$Worker.run(IgniteTests.java:156)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
....
i have a junit test IgniteTests.java
<http://apache-ignite-users.70518.x6.nabble.com/file/n9303/IgniteTests.java>
, it shows the problem.
Is it a bug in ignite or my mistake?
Great thanks in advance,
Vladimir
Here my JUnit
----------------------------------------------------
package ignite.cluster.tests;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class IgniteTests {
private Ignite[] clusterNodes;
@Before
public void start() throws Exception {
int numNodes = 2;
this.clusterNodes = new Ignite[numNodes];
for (int i = 0; i < numNodes; i++) {
this.clusterNodes[i] = startClusterNode(i);
}
boolean ready = waitClusterHasNodes(this.clusterNodes[0], numNodes);
if (!ready) {
throw new IllegalStateException("Cluster not ready after 15
seconds");
}
}
public boolean waitClusterHasNodes(final Ignite node, final int
numNodes) throws InterruptedException {
boolean ready = false;
for (int i = 0; i < 15; i++) {
Collection<ClusterNode> nodes0 = node.cluster().nodes();
if (nodes0.size() == numNodes) {
ready = true;
break;
}
Thread.sleep(1000);
}
return ready;
}
private Ignite startClusterNode(final int nodeIndex) throws Exception {
String nodeName = "node" + nodeIndex;
List<String> addresses = Arrays.asList("127.0.0.1:48500..48502");
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER,
Boolean.toString(false));
IgniteConfiguration config = new IgniteConfiguration();
config.setClassLoader(this.getClass().getClassLoader());
config.setPeerClassLoadingEnabled(false);
config.setGridLogger(new Slf4jLogger());
config.setGridName(nodeName);
config.setConsistentId(nodeName);
config.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT);
Map<String, Object> userAttributes = new HashMap<>();
userAttributes.put("clusterName", "JUnit-Cluster");
config.setUserAttributes(userAttributes);
config.setMetricsLogFrequency(0);
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(addresses);
config.setDiscoverySpi(spi);
config.setClientMode(false);
Ignite ignite = Ignition.start(config);
return ignite;
}
@After
public void stop() {
if (this.clusterNodes != null) {
for (int i = 0; i < this.clusterNodes.length; i++) {
shutdown(i);
}
}
}
/**
* @param casesCount
* @return
*/
protected ExecutorService createExecutor(final int casesCount) {
ThreadFactory factory = new
ThreadFactoryBuilder().setNameFormat(">>JUnit-Test-Worker-%d").build();
return Executors.newFixedThreadPool(casesCount * 2, factory);
}
private void shutdown(final int nodeNum) {
Ignite node = this.clusterNodes[nodeNum];
if (node != null) {
boolean stoped =
Ignition.stop(this.clusterNodes[nodeNum].name(), false);
Assert.assertTrue(stoped);
this.clusterNodes[nodeNum] = null;
}
}
private static class Worker implements Runnable {
private final int nr;
private final Ignite node;
private volatile boolean stoped = false;
private final CountDownLatch latch;
public Worker(final int nr, final Ignite node, final CountDownLatch
latch) {
this.nr = nr;
this.node = node;
this.latch = latch;
}
@Override
public void run() {
try {
while (!this.stoped) {
IgniteLock lock = this.node.reentrantLock("Lock_" +
this.nr, true, false, true);
lock.lock();
try {
Thread.sleep(2);
} catch (InterruptedException e) {
// ignore
}
lock.unlock();
}
} catch (Throwable e) {
System.err.println("Error occured in worker " + this.nr + "
on node " + this.node.name());
e.printStackTrace();
}
this.latch.countDown();
}
public void stop() {
this.stoped = true;
}
}
@Test
public void testEvents() throws Exception {
final Ignite node0 = this.clusterNodes[0];
final Ignite node1 = this.clusterNodes[1];
final int workersPerNode = 100;
ExecutorService service = createExecutor(workersPerNode * 2);
try {
CountDownLatch latch0 = new CountDownLatch(workersPerNode);
List<Worker> workersNode0 = new ArrayList(workersPerNode);
CountDownLatch latch1 = new CountDownLatch(workersPerNode);
List<Worker> workersNode1 = new ArrayList(workersPerNode);
for (int i = 0; i < workersPerNode; i++) {
{
Worker workerNode0 = new Worker(i, node0, latch0);
workersNode0.add(workerNode0);
service.submit(workerNode0);
}
{
Worker workerNode1 = new Worker(i, node1, latch1);
workersNode1.add(workerNode1);
service.submit(workerNode1);
}
}
Thread.sleep(3000);
// stop worker on node1
workersNode1.forEach(w -> w.stop());
boolean downNormally = latch1.await(15, TimeUnit.SECONDS);
if (!downNormally) {
Assert.fail("Workers are not ready, missing:" +
latch1.getCount());
}
System.out.println("Stopping node 1");
shutdown(1);
System.out.println("Node 1 is down");
Thread.sleep(10000);
System.out.println("Stopping worker on node 0");
workersNode0.forEach(w -> w.stop());
boolean normally = latch0.await(15, TimeUnit.SECONDS);
if (!normally) {
System.err.println("Problem occured, make thead dump...");
Thread.sleep(60 * 1000);
Assert.fail("Workers are not ready, missing: " +
latch0.getCount());
}
} finally {
service.shutdown();
}
}
}
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Problem-with-ReentranLocks-on-shutdown-of-one-node-in-cluster-tp9303.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.