Hi,
I am trying to set up a system with some chaes in a persistent and some in an
in-memory region.
However, it seems that the in-memory regions don't get synchronized between the
nodes. This would work with a pure setup of either only an in-memory region or
only a persistent one.
Below is a complete JUnit test showing my issue. The "testMem" will fail
because the update made on node "a" does not reach node "b". Any suggestions?
Ignite version: 2.9.1
package de.dicos.cpcfe.ignite;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author sth
*/
public class IgniteClusterTest
{
// /////////////////////////////////////////////////////////
// Class Fields
// /////////////////////////////////////////////////////////
/** */
private static Logger log =
LoggerFactory.getLogger(IgniteClusterTest.class);
/** */
private IgniteEx igA;
/** */
private IgniteEx igB;
/** */
public static final String PERSISTENT_REGION = "persistent";
/** */
public static final String IN_MEMORY_REGION = "inmemory";
// /////////////////////////////////////////////////////////
// Constructors
// /////////////////////////////////////////////////////////
/**
*/
public IgniteClusterTest()
{
}
// /////////////////////////////////////////////////////////
// Methods
// /////////////////////////////////////////////////////////
/**
* @throws java.lang.Exception
*/
@Before
public void setUp()
throws Exception
{
try {
log.info(">>>> starting node A");
File da = new
File("target/idd-a");
rmrf(da);
igA = startNode("a", da, 47500,
47100, 11211, 10800);
log.info(">>>> node A is
running");
Thread.sleep(1000);
log.info(">>>> starting node B");
File db = new
File("target/idd-b");
rmrf(db);
igB = startNode("b", db, 47501,
47101, 11212, 10801);
log.info(">>>> node B is
running");
} catch (Throwable x) {
log.error("unexpected
exception", x);
throw x;
}
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown()
throws Exception
{
log.info(">>>> stopping all nodes");
Ignition.stopAll(true);
}
@Test
public void testPerm() throws InterruptedException
{
log.info("#### "+new
Exception().getStackTrace()[0].getMethodName());
IgniteCache<String, String> kva =
getKeyValue(igA);
IgniteCache<String, String> kvb =
getKeyValue(igB);
kva.put("a", "aval");
Assert.assertEquals("aval", kvb.get("a"));
}
@Test
public void testMem() throws InterruptedException
{
log.info("#### "+new
Exception().getStackTrace()[0].getMethodName());
IgniteCache<String, String> kva =
getInMemoryKeyValue(igA);
@SuppressWarnings("unused")
IgniteCache<String, String> kvb =
getInMemoryKeyValue(igB);
kva.put("a", "aval");
Thread.sleep(1000);
Assert.assertEquals("aval", kvb.get("a"));
}
/**
*
*/
private IgniteEx startNode(String id, File datadir, int port,
int commPort, int connectorPort, int clientPort)
{
datadir.mkdirs();
String igniteDataDir = datadir.getAbsolutePath();
DataRegionConfiguration persistentRegion = new
DataRegionConfiguration()//
.setName(PERSISTENT_REGION)//
.setPersistenceEnabled(true);
DataRegionConfiguration inMemoryRegion = new
DataRegionConfiguration()//
.setName(IN_MEMORY_REGION)//
.setPersistenceEnabled(false)//
;
List<String> clusterAddresses =
Arrays.asList("127.0.0.1:47500..47501", "127.0.0.1:47505..47506");
IgniteConfiguration igniteCfg = new
IgniteConfiguration()//
.setIgniteInstanceName(id)//
.setWorkDirectory(igniteDataDir)//
.setConsistentId("testcluster-"+id)//
.setGridLogger(new
Slf4jLogger())//
.setMetricsLogFrequency(0)//
.setDataStorageConfiguration(new
DataStorageConfiguration()//
.setDefaultDataRegionConfiguration(persistentRegion)//
.setDataRegionConfigurations(inMemoryRegion))//
//.setDefaultDataRegionConfiguration(inMemoryRegion)//
//.setDataRegionConfigurations(persistentRegion))//
.setDiscoverySpi(new
TcpDiscoverySpi()//
.setIpFinder(new
TcpDiscoveryVmIpFinder()//
.setAddresses(clusterAddresses)
.setShared(true))//
.setLocalAddress("127.0.0.1")//
.setLocalPort(port)//
.setLocalPortRange(2))
.setCommunicationSpi(new
TcpCommunicationSpi()//
.setMessageQueueLimit(1024)
.setLocalAddress("127.0.0.1")
.setLocalPort(commPort)
.setLocalPortRange(2))//
.setConnectorConfiguration(new
ConnectorConfiguration()
.setHost("127.0.0.1")
.setPort(connectorPort)
.setPortRange(2))
.setClientConnectorConfiguration(new ClientConnectorConfiguration()
.setPort(clientPort)
.setPortRange(2));
IgniteEx ignite =
(IgniteEx)Ignition.start(igniteCfg);
ignite.cluster().state(ClusterState.ACTIVE);
return ignite;
}
/**
*
*/
private IgniteCache<String, String> getKeyValue(Ignite ignite)
{
return ignite.getOrCreateCache(new
CacheConfiguration<String, String>()
.setName("permkv")
.setDataRegionName(PERSISTENT_REGION)
.setCacheMode(CacheMode.REPLICATED)
.setBackups(2));
}
/**
*
*/
private IgniteCache<String, String> getInMemoryKeyValue(Ignite
ignite)
{
return ignite.getOrCreateCache(new
CacheConfiguration<String, String>()
.setName("memkv")
.setCacheMode(CacheMode.REPLICATED)
.setDataRegionName(IN_MEMORY_REGION)
.setBackups(2));
}
/**
* @param file
*/
public static void rmrf(File file)
{
if (file.isDirectory()) {
for (File sub: file.listFiles())
{
String n =
sub.getName();
if
(".".equals(n) || "..".equals(n)) {
continue;
}
rmrf(sub);
}
}
log.debug("deleting "+file.getAbsolutePath());
file.delete();
}
}
---
Mit freundlichen Grüßen
Stephan Hesse
Geschäftsführer
DICOS GmbH Kommunikationssysteme
Alsfelder Straße 11, 64289 Darmstadt
Telefon: +49 6151 82787 27, Mobil: +49 1761 82787 27
www.dicos.de<http://www.dicos.de/>
DICOS GmbH Kommunikationssysteme, Darmstadt, Amtsgericht Darmstadt HRB 7024,
Geschäftsführer: Dr. Winfried Geyer, Stephan Hesse, Waldemar Wiesner