Guys, read-through is working fine for me but write-through is not working. I'm not getting error/exception either. Following is my configuration file and relevant code blocks,
*Sever node configuration file:* $ cat test-tool-server.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <property name="peerClassLoadingEnabled" value="true"/> <property name="binaryConfiguration"> <bean class="org.apache.ignite.configuration.BinaryConfiguration"> <property name="compactFooter" value="false" /> <property name="idMapper"> <bean class="org.apache.ignite.binary.BinaryBasicIdMapper"> <constructor-arg name="isLowerCase" value="true" /> </bean> </property> <property name="nameMapper"> <bean class="org.apache.ignite.binary.BinaryBasicNameMapper"> <constructor-arg name="isSimpleName" value="true" /> </bean> </property> </bean> </property> <property name="cacheConfiguration"> <list> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="readThrough" value="true"></property> <property name="writeThrough" value="true"></property> <property name="writeBehindEnabled" value="true"></property> <property name="storeKeepBinary" value="true"></property> <property name="cacheStoreFactory"> <bean class="javax.cache.configuration.FactoryBuilder.ClassFactory"> <constructor-arg value="TestTableStore"/> </bean> </property> <property name="name" value="TestTable"/> <property name="cacheMode" value="PARTITIONED"/> <property name="atomicityMode" value="TRANSACTIONAL"/> <property name="writeSynchronizationMode" value="FULL_SYNC"/> <property name="queryEntities"> <list> <bean class="org.apache.ignite.cache.QueryEntity"> <property name="keyType" value="java.lang.Long"/> <property name="valueType" value="TestTable"/> <property name="fields"> <map> <entry key="tid" value="java.lang.Short"/> <entry key="idint" value="java.lang.Int"/> <entry key="idbigint" value="java.lang.Long"/> <entry key="idchar" value="java.lang.String"/> <entry key="idbinary" value="java.lang.byte[]"/> <entry key="idvarbinary" value="java.lang.byte[]"/> <entry key="idvarchar" value="java.lang.String"/> <entry key="idts" value="java.lang.Timestamp"/> </map> </property> <property name="indexes"> <list> <bean class="org.apache.ignite.cache.QueryIndex"> <constructor-arg value="tid"/> <constructor-arg value="SORTED"/> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <constructor-arg> <list> <value>tid</value> <value>idint</value> </list> </constructor-arg> <constructor-arg value="SORTED"/> </bean> </list> </property> </bean> </list> </property> </bean> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="readThrough" value="true"></property> <property name="writeThrough" value="true"></property> <property name="writeBehindEnabled" value="true"></property> <property name="storeKeepBinary" value="true"></property> <property name="cacheStoreFactory"> <bean class="javax.cache.configuration.FactoryBuilder.ClassFactory"> <constructor-arg value="TestTableStore"/> </bean> </property> <property name="name" value="TestTable1"/> <property name="cacheMode" value="PARTITIONED"/> <property name="atomicityMode" value="TRANSACTIONAL"/> <property name="writeSynchronizationMode" value="FULL_SYNC"/> <property name="queryEntities"> <list> <bean class="org.apache.ignite.cache.QueryEntity"> <property name="keyType" value="java.lang.Long"/> <property name="valueType" value="TestTable"/> <property name="fields"> <map> <entry key="tid" value="java.lang.Short"/> <entry key="idint" value="java.lang.Int"/> <entry key="idbigint" value="java.lang.Long"/> <entry key="idchar" value="java.lang.String"/> <entry key="idbinary" value="java.lang.byte[]"/> <entry key="idvarbinary" value="java.lang.byte[]"/> <entry key="idvarchar" value="java.lang.String"/> <entry key="idts" value="java.lang.Timestamp"/> </map> </property> <property name="indexes"> <list> <bean class="org.apache.ignite.cache.QueryIndex"> <constructor-arg value="tid"/> <constructor-arg value="SORTED"/> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <constructor-arg> <list> <value>tid</value> <value>idint</value> </list> </constructor-arg> <constructor-arg value="SORTED"/> </bean> </list> </property> </bean> </list> </property> </bean> </list> </property> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <value>127.0.0.1:47550..47551</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans> *C++ test program which talks to Apache Ignite* $ cat main.cc ... void* Insert_i(void* arg, const std::string& cname) { std::clog << __func__ << " called." << std::endl; Cache<int64_t, TestTable> ttcache = Ignition::Get().GetCache< int64_t, TestTable>(cname.c_str()); // Clear cache before running the example. //ttcache.Clear(); GenTestTableRec recgen; std::map<int64_t, TestTable> ttrecords; //TestTable rec = recgen(); std::string tmpstr("Getting Started contains a list of tasks you might want to perform when you set up your computer. Tasks in Getting Started include:Transferring files from another computer. Adding new users to your computer. Adding new users to your computer"); std::vector<int>* targ = static_cast<std::vector<int>* >(arg); int16_t tid = targ->operator[](1); TestTable rec(tid, tid, idbigint, std::string("Name_") + std::to_string(tid), tmpstr + "_" + std::to_string(tid), time(0)); sprintf((char*)rec.idbinary, "binary data for testing %d", tid); sprintf((char*)rec.idvarbinary, "%s_%d", tmpstr.c_str(), tid); rec.idbinlen = strlen((char*)rec.idbinary); rec.idvarbinlen = strlen((char*)rec.idvarbinary); int nentries = targ->operator[](0); for(int i = 0; i < nentries; i++) { rec.idts = ignite::Timestamp(time(0)*1000); rec.idint = idint++; rec.idbigint = idbigint++; ttrecords[key++] = rec; } std::clog << "Inserting " << ttrecords.size() << " records to " << cname << ":" << std::endl; //for(auto itr = ttrecords.begin(); itr != ttrecords.end(); ++itr) // std::clog << itr->first << ": " << itr->second.ToString() // << std::endl; struct timeval t0; gettimeofday(&t0, 0); ttcache.PutAll(ttrecords); struct timeval t1; gettimeofday(&t1, 0); ofs << time(0) << "," << ttrecords.size() << "," << t1.tv_sec * 1000000 + t1.tv_usec - t0.tv_sec * 1000000 - t0.tv_usec << ",+,"; std::clog << "last inserted record key: " << key - 1 << "." << std::endl; return (void*)0; } *CacheStoreAdapter implementation* $ cat TestTableStore.java ... public class TestTableStore extends CacheStoreAdapter<Long, TestTable> implements Serializable { /** Auto-injected store session. */ @CacheStoreSessionResource private CacheStoreSession ses; private Cluster cluster; public TestTableStore create() { return new TestTableStore(); } // Complete transaction or simply close connection if there is no transaction. @Override public void sessionEnd(boolean commit) { Bucket conn = ses.attachment(); if (conn != null && ses.isWithinTransaction()) { //if (commit) // conn.commit(); //else // conn.rollback(); } } // This mehtod is called whenever "get(...)" methods are called on IgniteCache. @Override public TestTable load(Long key) { Bucket conn = connection(); JsonDocument jd = conn.get(key.toString()); if(jd != null) { JsonObject jo = jd.content(); TestTable tt = new TestTable(((Integer)jo.get("tid")).shortValue(), (Integer)jo.get("idint"), (Long)jo.get("idbigint"), (String)jo.get("idchar"), (String)jo.get("idvarchar"), new Timestamp((Integer)jo.get("idts"))); tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary"))); tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary"))); return tt; } return null; } // This mehtod is called whenever "put(...)" methods are called on IgniteCache. @Override public void write(Cache.Entry<? extends Long, ? extends TestTable> entry) throws CacheWriterException { Bucket conn = connection(); TestTable val = entry.getValue(); conn.insert(JsonDocument.create(entry.getKey().toString(), JsonObject.create().put("tid", val.getTid()).put("idint", val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar", val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary", val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts", val.getIdts()))); } // This mehtod is called whenever "remove(...)" methods are called on IgniteCache. @Override public void delete(Object key) { Bucket conn = connection(); conn.remove((String)key); } // This mehtod is called whenever "loadCache()" and "localLoadCache()" // methods are called on IgniteCache. It is used for bulk-loading the cache. // If you don't need to bulk-load the cache, skip this method. @Override public void loadCache(IgniteBiInClosure<Long, TestTable> clo, Object... args) { if (args == null || args.length == 0 || args[0] == null) throw new CacheLoaderException("Expected entry count parameter is not provided."); final int entryCnt = (Integer)args[0]; Bucket conn = connection(); /*try (PreparedStatement st = conn.prepareStatement( "select tid, idint, idbigint, idchar, idvarchar" + ", idbinary, idvarbinary, idts from TestTable" + " where tid = ?")) { try (ResultSet rs = st.executeQuery()) { int cnt = 0; while (cnt < entryCnt && rs.next()) { TestTable tt = new TestTable(rs.getShort(1), rs.getInt(2), rs.getLong(3), rs.getString(4), rs.getString(5), rs.getTimestamp(8)); tt.setIdbinary(rs.getObject(6)); tt.setIdvarbinary(rs.getObject(7)); //clo.apply(tt.getTid(), tt); clo.apply(tt.getIdbigint(), tt); cnt++; } } }*/ } // Opens JDBC connection and attaches it to the ongoing // session if within a transaction. //private Bucket connection() throws SQLException { private Bucket connection() { if (ses.isWithinTransaction()) { Bucket conn = ses.attachment(); if (conn == null) { conn = openConnection(false); // Store connection in the session, so it can be accessed // for other operations within the same transaction. ses.attach(conn); } return conn; } // Transaction can be null in case of simple load or put operation. else return openConnection(true); } // Opens JDBC connection. //private Bucket openConnection(boolean autocommit) throws SQLException { private Bucket openConnection(boolean autocommit) { // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.) // In this example we use H2 Database for simplification. //Bucket conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); if (this.cluster == null) this.cluster = CouchbaseCluster.create(); Bucket conn = cluster.openBucket("TestTable"); //conn.setAutoCommit(autocommit); return conn; } // This mehtod is called whenever "getAll(...)" methods are called on IgniteCache. @Override public Map<Long, TestTable> loadAll(Iterable<? extends Long> keys) throws CacheLoaderException { Bucket conn = connection(); Map<Long, TestTable> loaded = new HashMap<Long, TestTable>(); for (Long key : keys) { JsonDocument jd = conn.get(key.toString()); if(jd != null) { JsonObject jo = jd.content(); TestTable tt = new TestTable(((Integer)jo.get("tid")).shortValue(), (Integer)jo.get("idint"), (Long)jo.get("idbigint"), (String)jo.get("idchar"), (String)jo.get("idvarchar"), new Timestamp((Integer)jo.get("idts"))); tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary"))); tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary"))); loaded.put(key, tt); } } return loaded; } // This mehtod is called whenever "putAll(...)" methods are called on IgniteCache. @Override public void writeAll(Collection<Cache.Entry<? extends Long, ? extends TestTable>> entries) throws CacheWriterException { Bucket conn = connection(); // Syntax of MERGE statement is database specific and should be adopted for your database. // If your database does not support MERGE statement then use sequentially update, insert statements. for (Cache.Entry<? extends Long, ? extends TestTable> entry : entries) { TestTable val = entry.getValue(); conn.upsert(JsonDocument.create(entry.getKey().toString(), JsonObject.create().put("tid", val.getTid()).put("idint", val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar", val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary", val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts", val.getIdts()))); System.out.println(conn.get(entry.getKey().toString())); } } // This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache. @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { Bucket conn = connection(); for (Object key : keys) conn.remove((String)key); } @Override public String toString() { return "TestTableStore [ses=" + ses + ", cluster=" + cluster + ']'; } } What can be the possible issue? thanks & regards, -- View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Couchbase-as-persistent-store-tp7476p8033.html Sent from the Apache Ignite Users mailing list archive at Nabble.com.
