Guys,

read-through is working fine for me but write-through is not working. I'm
not getting error/exception either. Following is my configuration file and
relevant code blocks,

*Sever node configuration file:*

$ cat test-tool-server.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:util="http://www.springframework.org/schema/util";
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd";>

    <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        
        <property name="peerClassLoadingEnabled" value="true"/>

<property name="binaryConfiguration">
<bean class="org.apache.ignite.configuration.BinaryConfiguration">
<property name="compactFooter" value="false" />
<property name="idMapper">
<bean class="org.apache.ignite.binary.BinaryBasicIdMapper">
<constructor-arg name="isLowerCase" value="true" />
</bean>
</property>
<property name="nameMapper">
<bean class="org.apache.ignite.binary.BinaryBasicNameMapper">
<constructor-arg name="isSimpleName" value="true" />
</bean>
</property>
</bean>
</property>

        <property name="cacheConfiguration">
            <list>
                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                
                <property name="readThrough" value="true"></property>
                <property name="writeThrough" value="true"></property>
                <property name="writeBehindEnabled" value="true"></property>
                <property name="storeKeepBinary" value="true"></property>
                
                    <property name="cacheStoreFactory">
                      <bean
class="javax.cache.configuration.FactoryBuilder.ClassFactory">
                        <constructor-arg value="TestTableStore"/>
                      </bean>
                    </property>

                    <property name="name" value="TestTable"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <property name="writeSynchronizationMode"
value="FULL_SYNC"/>

                    
                    <property name="queryEntities">
                        <list>
                            <bean
class="org.apache.ignite.cache.QueryEntity">
                                <property name="keyType"
value="java.lang.Long"/>
                                <property name="valueType"
value="TestTable"/>

                                <property name="fields">
                                    <map>
                                        <entry key="tid"
value="java.lang.Short"/>
                                        <entry key="idint"
value="java.lang.Int"/>
                                        <entry key="idbigint"
value="java.lang.Long"/>
                                        <entry key="idchar"
value="java.lang.String"/>
                                        <entry key="idbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarchar"
value="java.lang.String"/>
                                        <entry key="idts"
value="java.lang.Timestamp"/>
                                    </map>
                                </property>
                                <property name="indexes">
                                    <list>
                                        <bean 
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg value="tid"/>
                                            <constructor-arg value="SORTED"/>
                                        </bean>
                                        
                                        <bean 
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg>
                                                <list>
                                                    <value>tid</value>
                                                    <value>idint</value>
                                                </list>
                                            </constructor-arg>
                                            <constructor-arg value="SORTED"/>
                                        </bean>
                                    </list>
                                </property>
                            </bean>
                        </list>
                    </property>
                </bean>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="readThrough" value="true"></property>
                <property name="writeThrough" value="true"></property>
                <property name="writeBehindEnabled" value="true"></property>
                <property name="storeKeepBinary" value="true"></property>
                    <property name="cacheStoreFactory">
                      <bean
class="javax.cache.configuration.FactoryBuilder.ClassFactory">
                        <constructor-arg value="TestTableStore"/>
                      </bean>
                    </property>

                    <property name="name" value="TestTable1"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="TRANSACTIONAL"/>
                    <property name="writeSynchronizationMode"
value="FULL_SYNC"/>

                    
                    <property name="queryEntities">
                        <list>
                            <bean
class="org.apache.ignite.cache.QueryEntity">
                                <property name="keyType"
value="java.lang.Long"/>
                                <property name="valueType"
value="TestTable"/>

                                <property name="fields">
                                    <map>
                                        <entry key="tid"
value="java.lang.Short"/>
                                        <entry key="idint"
value="java.lang.Int"/>
                                        <entry key="idbigint"
value="java.lang.Long"/>
                                        <entry key="idchar"
value="java.lang.String"/>
                                        <entry key="idbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarbinary"
value="java.lang.byte[]"/>
                                        <entry key="idvarchar"
value="java.lang.String"/>
                                        <entry key="idts"
value="java.lang.Timestamp"/>
                                    </map>
                                </property>
                                <property name="indexes">
                                    <list>
                                        <bean
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg value="tid"/>
                                            <constructor-arg value="SORTED"/>
                                        </bean>
                                        
                                        <bean
class="org.apache.ignite.cache.QueryIndex">
                                            <constructor-arg>
                                                <list>
                                                    <value>tid</value>
                                                    <value>idint</value>
                                                </list>
                                            </constructor-arg>
                                            <constructor-arg
value="SORTED"/>
                                        </bean>
                                    </list>
                                </property>
                            </bean>
                        </list>
                    </property>
                </bean>
            </list>
        </property>

        
        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    
                    
                    
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                
                                <value>127.0.0.1:47550..47551</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

*C++ test program which talks to Apache Ignite*

$ cat main.cc
...
void* Insert_i(void* arg, const std::string& cname) {
        std::clog << __func__ << " called." << std::endl;
        Cache<int64_t, TestTable> ttcache = Ignition::Get().GetCache<
                                int64_t, TestTable>(cname.c_str());
        // Clear cache before running the example.
        //ttcache.Clear();

        GenTestTableRec recgen;
        std::map<int64_t, TestTable> ttrecords;
        //TestTable rec = recgen();
        std::string tmpstr("Getting Started contains a list of tasks you might 
want
to perform when you set up your computer. Tasks in Getting Started
include:Transferring files from another computer. Adding new users to your
computer. Adding new users to your computer");
        std::vector<int>* targ = static_cast<std::vector&lt;int>* >(arg);
        int16_t tid = targ->operator[](1);
        TestTable rec(tid, tid, idbigint, std::string("Name_") +
std::to_string(tid), tmpstr + "_" + std::to_string(tid), time(0));
        sprintf((char*)rec.idbinary, "binary data for testing %d", tid);
        sprintf((char*)rec.idvarbinary, "%s_%d", tmpstr.c_str(), tid);
        rec.idbinlen = strlen((char*)rec.idbinary);
        rec.idvarbinlen = strlen((char*)rec.idvarbinary);
        int nentries = targ->operator[](0);
        for(int i = 0; i < nentries; i++) {
                rec.idts = ignite::Timestamp(time(0)*1000);
                rec.idint = idint++;
                rec.idbigint = idbigint++;
                ttrecords[key++] = rec;
        }
        std::clog << "Inserting " << ttrecords.size() << " records to " <<
                                cname << ":" << std::endl;
        //for(auto itr = ttrecords.begin(); itr != ttrecords.end(); ++itr)
        //      std::clog << itr->first << ": " << itr->second.ToString()
        //                      << std::endl;
        struct timeval t0;
        gettimeofday(&t0, 0);
        ttcache.PutAll(ttrecords);
        struct timeval t1;
        gettimeofday(&t1, 0);
        ofs << time(0) << "," << ttrecords.size() << "," << t1.tv_sec * 1000000 
+
t1.tv_usec
                                - t0.tv_sec * 1000000 - t0.tv_usec << ",+,";
        std::clog << "last inserted record key: " << key - 1 << "." << 
std::endl;

        return (void*)0;
}


*CacheStoreAdapter implementation*

$ cat TestTableStore.java 
...
public class TestTableStore extends CacheStoreAdapter<Long, TestTable>
implements Serializable {

        /** Auto-injected store session. */
        @CacheStoreSessionResource
        private CacheStoreSession ses;
        private Cluster cluster;

        public TestTableStore create() {
                return new TestTableStore();
        }
        // Complete transaction or simply close connection if there is no
transaction.
        @Override public void sessionEnd(boolean commit) {
                Bucket conn = ses.attachment();
                if (conn != null && ses.isWithinTransaction()) {
                        //if (commit)
                        //      conn.commit();
                        //else
                        //      conn.rollback();
                }
        }

        // This mehtod is called whenever "get(...)" methods are called on
IgniteCache.
        @Override public TestTable load(Long key) {
                Bucket conn = connection();
                JsonDocument jd = conn.get(key.toString());
                if(jd != null) {
                        JsonObject jo = jd.content();
                        TestTable tt = new 
TestTable(((Integer)jo.get("tid")).shortValue(),
(Integer)jo.get("idint"), (Long)jo.get("idbigint"),
(String)jo.get("idchar"), (String)jo.get("idvarchar"), new
Timestamp((Integer)jo.get("idts")));
                        
tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary")));
                
tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary")));
                        return tt;
                }
                return null;
        }

        // This mehtod is called whenever "put(...)" methods are called on
IgniteCache.
        @Override public void write(Cache.Entry<? extends Long, ? extends
TestTable> entry) throws CacheWriterException {
                Bucket conn = connection();
                TestTable val = entry.getValue();
                conn.insert(JsonDocument.create(entry.getKey().toString(),
JsonObject.create().put("tid", val.getTid()).put("idint",
val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar",
val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary",
val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts",
val.getIdts())));
        }

        // This mehtod is called whenever "remove(...)" methods are called on
IgniteCache.
        @Override public void delete(Object key) {
                Bucket conn = connection();
                conn.remove((String)key);
        }

        // This mehtod is called whenever "loadCache()" and "localLoadCache()"
        // methods are called on IgniteCache. It is used for bulk-loading the
cache.
        // If you don't need to bulk-load the cache, skip this method.
        @Override public void loadCache(IgniteBiInClosure<Long, TestTable> clo,
Object... args) {
                if (args == null || args.length == 0 || args[0] == null)
                        throw new CacheLoaderException("Expected entry count 
parameter is not
provided.");

                final int entryCnt = (Integer)args[0];

                Bucket conn = connection();
                /*try (PreparedStatement st = conn.prepareStatement(
                        "select tid, idint, idbigint, idchar, idvarchar"
                        + ", idbinary, idvarbinary, idts from TestTable"
                        + " where tid = ?")) {
                        try (ResultSet rs = st.executeQuery()) {
                                int cnt = 0;

                                while (cnt < entryCnt && rs.next()) {
                                        TestTable tt = new 
TestTable(rs.getShort(1), rs.getInt(2),
rs.getLong(3), rs.getString(4), rs.getString(5), rs.getTimestamp(8));
                                        tt.setIdbinary(rs.getObject(6));
                                        tt.setIdvarbinary(rs.getObject(7));

                                        //clo.apply(tt.getTid(), tt);
                                        clo.apply(tt.getIdbigint(), tt);

                                        cnt++;
                                }
                        }
                }*/
        }

        // Opens JDBC connection and attaches it to the ongoing
        // session if within a transaction.
        //private Bucket connection() throws SQLException {
        private Bucket connection() {
                if (ses.isWithinTransaction()) {
                        Bucket conn = ses.attachment();

                        if (conn == null) {
                                conn = openConnection(false);

                                // Store connection in the session, so it can 
be accessed
                                // for other operations within the same 
transaction.
                                ses.attach(conn);
                        }

                        return conn;
                }
                // Transaction can be null in case of simple load or put 
operation.
                else
                        return openConnection(true);
        }

        // Opens JDBC connection.
        //private Bucket openConnection(boolean autocommit) throws SQLException 
{
        private Bucket openConnection(boolean autocommit) {
                // Open connection to your RDBMS systems (Oracle, MySQL, 
Postgres, DB2,
Microsoft SQL, etc.)
                // In this example we use H2 Database for simplification.
                //Bucket conn =
DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
                if (this.cluster == null)
                        this.cluster = CouchbaseCluster.create();
                Bucket conn = cluster.openBucket("TestTable");

                //conn.setAutoCommit(autocommit);

                return conn;
        }

        // This mehtod is called whenever "getAll(...)" methods are called on
IgniteCache.
        @Override public Map<Long, TestTable> loadAll(Iterable<? extends Long>
keys) throws CacheLoaderException {
                Bucket conn = connection();
                Map<Long, TestTable> loaded = new HashMap<Long, TestTable>();

                for (Long key : keys) {
                        JsonDocument jd = conn.get(key.toString());
                        if(jd != null) {
                                JsonObject jo = jd.content();
                                TestTable tt = new 
TestTable(((Integer)jo.get("tid")).shortValue(),
(Integer)jo.get("idint"), (Long)jo.get("idbigint"),
(String)jo.get("idchar"), (String)jo.get("idvarchar"), new
Timestamp((Integer)jo.get("idts")));
                        
tt.setIdbinary(SerializationUtils.serialize((String)jo.get("idbinary")));
                        
tt.setIdvarbinary(SerializationUtils.serialize((String)jo.get("idvarbinary")));
                                loaded.put(key, tt);
                        }
                }
                return loaded;
        }

        // This mehtod is called whenever "putAll(...)" methods are called on
IgniteCache.
        @Override public void writeAll(Collection<Cache.Entry&lt;? extends 
Long, ?
extends TestTable>> entries) throws CacheWriterException {
                Bucket conn = connection();
                // Syntax of MERGE statement is database specific and should be 
adopted
for your database.
                // If your database does not support MERGE statement then use 
sequentially
update, insert statements.
                for (Cache.Entry<? extends Long, ? extends TestTable> entry : 
entries) {
                        TestTable val = entry.getValue();
                        
conn.upsert(JsonDocument.create(entry.getKey().toString(),
JsonObject.create().put("tid", val.getTid()).put("idint",
val.getIdint()).put("idbigint", val.getIdbigint()).put("idchar",
val.getIdchar()).put("idbinary", val.getIdbinary()).put("idvarbinary",
val.getIdvarbinary()).put("idvarchar", val.getIdvarchar()).put("idts",
val.getIdts())));
                        System.out.println(conn.get(entry.getKey().toString()));
                }
        }
 
        // This mehtod is called whenever "removeAll(...)" methods are called on
IgniteCache.
        @Override public void deleteAll(Collection<?> keys) throws
CacheWriterException {
                Bucket conn = connection();
                for (Object key : keys)
                        conn.remove((String)key);
        }

        @Override public String toString() {
                return "TestTableStore [ses=" + ses + ", cluster=" + cluster + 
']';
        }
}


What can be the possible issue?

thanks & regards,



--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Couchbase-as-persistent-store-tp7476p8033.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to