Database writes are not rolling back as I expected.
I've spent many hours reading software documentation and web postings.
I have not been able to resolve the issue.
I'm hoping you folks can help me.

Scenario
. My application pulls a message from a queue, extracts data from the 
message, and writes it to a database.
. The method that writes to the database does 2 SQL inserts.
. The second insert gets an exception:
   org.postgresql.util.PSQLException: ERROR: duplicate key value violates 
unique constraint "table2_PK"
. However, the first insert is still getting committed to the database.

Relevant Software
. spring-boot 1.2.5.RELEASE
. atomikos-util 3.9.3 (from spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
. activemq-client 5.1.2
. jooq 3.6.2
. postgresql 9.4-1201-jdbc41

Application Code
I've pasted the relevant parts of my code below.
 1. GdmServer - my "server" class, which also declares Spring bean 
configurations
 2. PortSIQueue - my JMS MessageListener class
 3. Kernel - my worker class, a Spring bean invoked by my MessageListener, 
i.e. code that writes to database

I'd appreciate any help anyone can offer.
Thanks

------------------------------------------------------------
package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import 
org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
    
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisConfig                       gisConfig;
    
    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }
    
    // 
-------------------------------------------------------------------------
    // Spring bean configurations
    // 
-------------------------------------------------------------------------
    
    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }
    
    @Bean
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction   ( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        return manager;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws 
SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }
    
    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );
    
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        return xaDataSource;
    }
    
    @Bean
    DSLContext dslContext() {
        DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), 
SQLDialect.POSTGRES);
        return dslContext;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new 
ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( 
gisConfig.getMomBrokerUrl() );
        
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new 
AtomikosConnectionFactoryBean();
        
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }
    
    @Bean
    DefaultMessageListenerContainer queueWrapperGDM() throws 
SystemException {
        DefaultMessageListenerContainer messageSource = new 
DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() 
);
        messageSource.setSessionTransacted(true);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( 
gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        return messageSource;
    }
    
    @Bean
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( 
gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
 
}

------------------------------------------------------------
package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {
    
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisMessageMarshaler             queueMessageMashaler;
    @Autowired
    Kernel                          kernel;
    
    @Override
    @Transactional(rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {
        
        TextMessage jmsTextMessage = (TextMessage) jmsMessage;
        
        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
        
        kernel.receiveCreateGenomicTestOrderInGIS( 
(CreateGenomicTestOrderInGIS) gisMessage );
    }

}

------------------------------------------------------------
package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;
import org.jooq.impl.DSL;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void 
receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

            dslContext.insertInto(table1)
                .set(...)
                .set(...)
            .execute();

            dslContext.insertInto(table2)
                .set(...)
                .set(...)
            .execute();
    }
<snip>
}

------------------------------------------------------------

-- 
You received this message because you are subscribed to the Google Groups "jOOQ 
User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to