Hi
I'm working on an ignite project with cassandra as persistent storage.
Initially an ignite client connects to a ignite server. After connecting if
ignite server
is restarted, the client also reconnects to the server. But after
reconnecting during
committing data into cassandra I'm running into NullPointerException for
some of the runs/attempts randomly.
caused by: java.lang.NullPointerException
at
org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:167)
at
org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindValues(PersistenceController.java:450)
at
org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindKeyValue(PersistenceController.java:202)
at
org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation.bindStatement(WriteMutation.java:58)
at
org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:499)
I'm observing the issue randomly and after going through the source code
there is a suspicion that its a java serialization issue in ignite cassandra
module
In org.apache.ignite.cache.store.cassandra.persistence.PojoField.java, there
is a PojoFieldAccessor instance variable which is transient type, so it will
not be part of serialization and if PojoField object is serialized and then
deserialized it would have PojoFieldAccessor as null. And in the Exception
we are seeing the same, NullPointerException when getValue(..) is called on
null PojoFieldAccessor in PojoField.getValueFromObject() method . So when
ever PojoField object is serialized and then deserialized we might be
observing the issue.
Below are steps for reproducing the issue
1. start the ignite server.
2. start the reproducer client and immediately after it starts to sleep,
kill and restart the ignite server.
we can observe that client reconnects to the server.
But during commit we sometimes observe NullPointerException . We can try
attempting steps 1 and 2 till we encounter the issue.
Following is a sample reproducer:
****************** DemoProgram *************
package com.IgniteDemo;
import java.util.*;
import javax.cache.Cache.Entry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
public class DemoMain {
public static void main(String[] args){
String ConfigPath = "default-config.xml";
Ignite ignite = Ignition.start(ConfigPath);
//sleep for 30 seconds after ignite client is started and connected to
one
ignite server.
//During this time quickly restart the ignite server to which the
client is
connecting to.
System.out.println("sleeping..");
Thread.sleep(30000);
IgniteCache<PersonPK, Person> cache = ignite.cache("Person");
Transaction tx =
ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ, 60000, 0);
long dob = 771465600000L;
Person p1 = new Person(1,"yash",22,"124345","addr",new Date(dob));
cache.put(p1.getPK(),p1);
System.out.println("cache put successful");
System.out.println("committing..");
tx.commit();
System.out.println("cache size "+ cache.size());
}
*** OUTPUT ***
[18:13:00] Topology snapshot [ver=2, servers=1, clients=1, CPUs=4,
heap=2.8GB]
sleeping..
[18:13:04] Failed to connect to any address from IP finder (will retry to
join topology every 2 secs): [/127.0.0.1:47500]
[18:13:05,619][SEVERE][tcp-client-disco-sock-writer-#2][TcpDiscoverySpi]
Failed to send message: TcpDiscoveryClientMetricsUpdateMessage
[super=TcpDiscoveryAbstractMessage [sndNodeId=null,
id=9374504f361-228d774d-1de9-47ff-9ad7-cb69400bd890, verifierNodeId=null,
topVer=0, pendingIdx=0, failedNodes=null, isClient=true]]
java.net.SocketException: Socket is closed
at java.net.Socket.getSendBufferSize(Unknown Source)
at
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.socketStream(TcpDiscoverySpi.java:1358)
at
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.writeToSocket(TcpDiscoverySpi.java:1470)
at
org.apache.ignite.spi.discovery.tcp.ClientImpl$SocketWriter.body(ClientImpl.java:1240)
at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62)
[18:13:09] New version is available at ignite.apache.org: 2.4.0
[18:13:10,727][SEVERE][tcp-client-disco-reconnector-#5][TcpDiscoverySpi]
Failed to reconnect to cluster (consider increasing 'networkTimeout'
configuration property) [networkTimeout=5000]
[18:13:19] Client node was reconnected after it was already considered
failed by the server topology (this could happen after all servers restarted
or due to a long network outage between the client and servers). All
continuous queries and remote event listeners created by this client will be
unsubscribed, consider listening to EVT_CLIENT_NODE_RECONNECTED event to
restore them.
[18:13:20] Topology snapshot [ver=2, servers=1, clients=1, CPUs=4,
heap=2.8GB]
cache put successful
committing..
[18:37:55,656][SEVERE][sys-stripe-1-#2][CassandraCacheStore] Failed to apply
1 mutations performed withing Ignite transaction into Cassandra
class org.apache.ignite.IgniteException: Failed to apply 1 mutations
performed withing Ignite transaction into Cassandra
at
org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:534)
at
org.apache.ignite.cache.store.cassandra.CassandraCacheStore.sessionEnd(CassandraCacheStore.java:172)
at
org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.sessionEnd(GridCacheStoreManagerAdapter.java:790)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter.sessionEnd(IgniteTxAdapter.java:1228)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter.batchStoreCommit(IgniteTxAdapter.java:1410)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter.userCommit(IgniteTxLocalAdapter.java:502)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal.localFinish(GridNearTxLocal.java:3078)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.finish(GridNearTxFinishFuture.java:418)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal$16.apply(GridNearTxLocal.java:3229)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal$16.apply(GridNearTxLocal.java:3223)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:383)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:347)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:335)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:495)
at
org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture.onDone(GridCacheCompoundFuture.java:56)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:474)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimisticTxPrepareFuture.onDone(GridNearPessimisticTxPrepareFuture.java:410)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimisticTxPrepareFuture.onDone(GridNearPessimisticTxPrepareFuture.java:58)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:451)
at
org.apache.ignite.internal.util.future.GridCompoundFuture.checkComplete(GridCompoundFuture.java:285)
at
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:144)
at
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:45)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:383)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:347)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:335)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:495)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:474)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:451)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimisticTxPrepareFuture$MiniFuture.onResult(GridNearPessimisticTxPrepareFuture.java:477)
at
org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimisticTxPrepareFuture.onResult(GridNearPessimisticTxPrepareFuture.java:106)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler.processNearTxPrepareResponse(IgniteTxHandler.java:657)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler.access$100(IgniteTxHandler.java:97)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$2.apply(IgniteTxHandler.java:183)
at
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$2.apply(IgniteTxHandler.java:181)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1060)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:378)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:304)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:99)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:293)
at
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
at
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
at
org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
at
org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
at
org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at
org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:167)
at
org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindValues(PersistenceController.java:450)
at
org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindKeyValue(PersistenceController.java:202)
at
org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation.bindStatement(WriteMutation.java:58)
at
org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:499)
... 45 more
*********** Person.java *********************
package com.IgniteDemo;
import java.util.Date;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
public class Person {
@QuerySqlField(index = false)
private int person_no;
@QuerySqlField(index = false)
private String name;
@QuerySqlField(index = false)
private Integer age=null;
@QuerySqlField(index = false)
private String phno;
@QuerySqlField(index = false)
private String address;
@QuerySqlField(index = false)
public String tid="t1";
@QuerySqlField(index = false)
public Date dob;
public Date getDob() {
return dob;
}
public void setDob(Date dob) {
this.dob = dob;
}
public Person(){
}
public int getPerson_no() {
return person_no;
}
public void setPerson_no(int person_no) {
this.person_no = person_no;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getPhno() {
return phno;
}
public void setPhno(String phno) {
this.phno = phno;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public PersonPK getPK(){
return new PersonPK(person_no,phno);
}
public Person(int person_no, String name, Integer age, String phno,
String
address, Date dob) {
this.person_no = person_no;
this.name = name;
this.age = age;
this.phno = phno;
this.address = address;
this.dob = dob;
}
}
*******************************************************************************
************* PersonPK.java ***************
package com.IgniteDemo;
import java.util.Arrays;
public class PersonPK {
private String phno;
private int person_no;
public PersonPK(){
}
public PersonPK(int person_no,String phno){
this.person_no = person_no;
this.phno = phno;
}
public int getPerson_no() {
return person_no;
}
public void setPerson_no(int person_no) {
this.person_no = person_no;
}
public String getPhno() {
return phno;
}
public void setPhno(String phno) {
this.phno = phno;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + person_no;
result = prime * result + ((phno == null) ? 0 :
phno.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PersonPK other = (PersonPK) obj;
if (person_no != other.person_no)
return false;
if (phno == null) {
if (other.phno != null)
return false;
} else if (!phno.equals(other.phno))
return false;
return true;
}
@Override
public String toString() {
return "PersonPK [phno=" + phno + ", person_no=" + person_no
+ "]";
}
}
***********************************************************************************
**************** default-config.xml ****************************
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="./connection-settings.xml" />
<bean id="cache_persistence_settings"
class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
<constructor-arg type="org.springframework.core.io.Resource"
value="file:C:\Users\persistence-settings.xml" />
</bean>
<bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="clientMode" value="true"/>
<property name="cacheConfiguration">
<list>
<bean
class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name"
value="Person"/>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="indexedTypes">
<list>
<value>com.IgniteDemo.PersonPK</value>
<value>com.IgniteDemo.Person</value>
</list>
</property>
<property name="cacheStoreFactory">
<bean
class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
<property name="dataSourceBean"
value="cassandraRegularDataSource"/>
<property name="persistenceSettingsBean"
value="cache_persistence_settings"/>
</bean>
</property>
<property name="queryEntities">
<list>
<bean
class="org.apache.ignite.cache.QueryEntity">
<property name="keyType"
value="com.IgniteDemo.PersonPK"/>
<property name="valueType"
value="com.IgniteDemo.Person"/>
</bean>
</list>
</property>
<property name="atomicityMode"
value="TRANSACTIONAL"/>
</bean>
</list>
</property>
<property name="peerClassLoadingEnabled" value="true"/>
<property name="discoverySpi">
<bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
*************************************************************************************
*********** persistence-settings.xml ***********
<persistence keyspace="testkeyspace" table="person">
<keyPersistence class="com.IgniteDemo.PersonPK" strategy="POJO" />
<valuePersistence class="com.IgniteDemo.Person" strategy="POJO" />
</persistence>
***************************************************
********** connection-settings.xml *************************************
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="loadBalancingPolicy"
class="com.datastax.driver.core.policies.TokenAwarePolicy">
<constructor-arg
type="com.datastax.driver.core.policies.LoadBalancingPolicy">
<bean
class="com.datastax.driver.core.policies.RoundRobinPolicy"/>
</constructor-arg>
</bean>
<bean id="cassandraRegularDataSource"
class="org.apache.ignite.cache.store.cassandra.datasource.DataSource">
<property name="readConsistency" value="QUORUM"/>
<property name="writeConsistency" value="QUORUM"/>
<property name="loadBalancingPolicy" ref="loadBalancingPolicy"/>
<property name="user" value="cassandra"/>
<property name="password" value="cassandra"/>
<property name="contactPoints">
<list>
<value>10.1.2.3</value>
</list>
</property>
</bean>
</beans>
*********************************************************
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/