Sergey Antonov created IGNITE-12774:
---------------------------------------

             Summary: Transaction hungs after too many open files NIO exception
                 Key: IGNITE-12774
                 URL: https://issues.apache.org/jira/browse/IGNITE-12774
             Project: Ignite
          Issue Type: Bug
            Reporter: Sergey Antonov
            Assignee: Sergey Antonov


Transaction hung after “Open too many files” error and never been finished.


{code:java}
import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;

public class TooManyOpenFilesTest extends GridCommonAbstractTest {
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        return super.getConfiguration(igniteInstanceName)
            .setFailureHandler(new StopNodeOrHaltFailureHandler())
            .setCommunicationSpi(new TooManyOpenFilesTcpCommunicationSpi())
            .setConsistentId(igniteInstanceName);
    }

    @Override protected void beforeTest() throws Exception {
        super.beforeTest();

        stopAllGrids();

        cleanPersistenceDir();
    }

    @Override protected void afterTest() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.afterTest();
    }

    public void test() throws Exception {
        IgniteEx crd = startGrids(3);

        crd.cluster().active(true);

        crd.getOrCreateCache(new 
CacheConfiguration<>().setName(DEFAULT_CACHE_NAME).setAtomicityMode(TRANSACTIONAL).setBackups(1).setCacheMode(PARTITIONED));

        TooManyOpenFilesTcpCommunicationSpi spi = 
(TooManyOpenFilesTcpCommunicationSpi)grid(2).context().config().getCommunicationSpi();

        try (Transaction tx = 
grid(1).transactions().txStart(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.REPEATABLE_READ)) {
            IgniteCache<Object, Object> cache = 
grid(1).cache(DEFAULT_CACHE_NAME);

            cache.put(1, 1);

            spi.throwException.set(true);

            cache.put(2, 2);
            cache.put(3, 2);
            cache.put(4, 2);

            // hungs here.
            tx.commit();
        }

        for (int i=0; i < 3 ; i++) {
            assertEquals(1, grid(i).cache(DEFAULT_CACHE_NAME).get(1));
            assertEquals(2, grid(i).cache(DEFAULT_CACHE_NAME).get(2));
        }
    }


    private static class TooManyOpenFilesTcpCommunicationSpi extends 
TcpCommunicationSpi {
        private final AtomicBoolean throwException = new AtomicBoolean();

        /** {@inheritDoc} */
        @Override public void sendMessage(ClusterNode node, Message msg) throws 
IgniteSpiException {
            if (throwException.get())
                throw getException(node);

            super.sendMessage(node, msg);
        }

        /** {@inheritDoc} */
        @Override public void sendMessage(
            ClusterNode node,
            Message msg,
            IgniteInClosure<IgniteException> ackC
        ) throws IgniteSpiException {
            if (throwException.get())
                throw getException(node);

            super.sendMessage(node, msg, ackC);
        }

        private IgniteSpiException getException(ClusterNode node) {
            String checkedExceptionMsg =  "Failed to connect to node (is node 
still alive?). " +
                "Make sure that each ComputeTask and cache Transaction has a 
timeout set " +
                "in order to prevent parties from waiting forever in case of 
network issues " +
                "[nodeId=" + node.id() + ", addrs=null]";

            return new IgniteSpiException("Failed to send message to remote 
node: " + node.id(), new IgniteCheckedException(checkedExceptionMsg, new 
SocketException("Too many open files")));
        }
    }
}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to