Database writes are not rolling back as I expected.
I've spent many hours reading software documentation and web postings.
I have not been able to resolve the issue.
I'm hoping you folks can help me.
Scenario
. My application pulls a message from a queue, extracts data from the
message, and writes it to a database.
. The method that writes to the database does 2 SQL inserts.
. The second insert gets an exception:
org.postgresql.util.PSQLException: ERROR: duplicate key value violates
unique constraint "table2_PK"
. However, the first insert is still getting committed to the database.
Relevant Software
. spring-boot 1.2.5.RELEASE
. atomikos-util 3.9.3 (from spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
. activemq-client 5.1.2
. jooq 3.6.2
. postgresql 9.4-1201-jdbc41
Application Code
I've pasted the relevant parts of my code below.
1. GdmServer - my "server" class, which also declares Spring bean
configurations
2. PortSIQueue - my JMS MessageListener class
3. Kernel - my worker class, a Spring bean invoked by my MessageListener,
i.e. code that writes to database
I'd appreciate any help anyone can offer.
Thanks
------------------------------------------------------------
package com.sm.gis.gdm;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import
org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;
@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;
/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}
//
-------------------------------------------------------------------------
// Spring bean configurations
//
-------------------------------------------------------------------------
@Bean
GisConfig gisConfig() {
return new GisConfig();
}
@Bean
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager( atomikosUserTransactionManager() );
manager.setUserTransaction ( atomikosUserTransaction() );
manager.setAllowCustomIsolationLevels(true);
return manager;
}
@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws
SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
return manager;
}
@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl( gisConfig.getGdbUrl() );
pgXADataSource.setUser( gisConfig.getGdbUser() );
pgXADataSource.setPassword( gisConfig.getGdbPassword() );
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
return xaDataSource;
}
@Bean
DSLContext dslContext() {
DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(),
SQLDialect.POSTGRES);
return dslContext;
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new
ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(
gisConfig.getMomBrokerUrl() );
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new
AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}
@Bean
DefaultMessageListenerContainer queueWrapperGDM() throws
SystemException {
DefaultMessageListenerContainer messageSource = new
DefaultMessageListenerContainer();
messageSource.setTransactionManager( transactionManager() );
messageSource.setConnectionFactory( atomikosJmsConnectionFactory()
);
messageSource.setSessionTransacted(true);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout(
gisConfig.getMomQueueGdmTimeoutReceive() );
messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
messageSource.setMessageListener( context.getBean("portSIQueue") );
return messageSource;
}
@Bean
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
jmsTemplate.setDefaultDestinationName(
gisConfig.getMomQueueLimsName() );
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
}
------------------------------------------------------------
package com.sm.gis.gdm.ports;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;
@Component
public class PortSIQueue implements MessageListener {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;
@Override
@Transactional(rollbackFor = {Throwable.class})
public void onMessage(Message jmsMessage) {
TextMessage jmsTextMessage = (TextMessage) jmsMessage;
// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}
// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
kernel.receiveCreateGenomicTestOrderInGIS(
(CreateGenomicTestOrderInGIS) gisMessage );
}
}
------------------------------------------------------------
package com.sm.gis.gdm.kernel;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
@Component
public class Kernel {
@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;
<snip>
public void
receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {
dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();
dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}
------------------------------------------------------------
--
You received this message because you are subscribed to the Google Groups "jOOQ
User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.