I'm an idiot.
Turns out the issue was due to a defect in my application logic.
The ActiveMQ component retries a message if the first attempt to process 
the message fails with an exception. The transaction created for the first 
attempt rolled back correctly. It was the second attempt that succeeded. 
The retry succeeded because a database sequence number was incremented by 
the application logic during the first attempt, and the second attempt did 
not result in a duplicate key violation. After correcting the application 
logic defect, since in my application no message is retry-able anyway, I 
turned off retry, too.
I apologize for wasting the time of those who read my post.

Along the way, I did make some changes to the implementation. The changes 
make certain default values explicit choices. I left those changes in 
because I believe they will make it easier for other developers on my team 
to understand what's happening more quickly. I also left the JOOQ exception 
translation code in place because it would be needed in other circumstances 
and appears to be best practice anyway.

I've included the modified code in this post, in case others might find it 
useful.

------------------------------------------------------------
package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import 
org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement(proxyTargetClass=true)
public class GdmServer {
    
    @Autowired
    ConfigurableApplicationContext   context;
    @Autowired
    GisConfig                        gisConfig;
    
    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }
    
    // 
-------------------------------------------------------------------------
    // Spring bean configurations
    // 
-------------------------------------------------------------------------
    
    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }
    
    @Bean
    @DependsOn({ "atomikosUserTransactionManager", 
"atomikosUserTransaction", "atomikosJdbcConnectionFactory", 
"atomikosJmsConnectionFactory" })
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        manager.afterPropertiesSet();
        return manager;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws 
SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }
    
    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception 
{
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );
    
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        xaDataSource.setTestQuery("SELECT 1");
        xaDataSource.afterPropertiesSet();
        return xaDataSource;
    }
    
    @Bean
    @DependsOn({ "atomikosJdbcConnectionFactory" })
    DSLContext dslContext() throws Exception {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
        jooqConfiguration.set( SQLDialect.POSTGRES_9_4 );
        jooqConfiguration.set( atomikosJdbcConnectionFactory() );
        jooqConfiguration.set( new DefaultExecuteListenerProvider(new 
JooqToSpringExceptionTransformer()) );
        DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
        return dslContext;
    }


    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setRedeliveryDelay(0);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(0);
        
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new 
ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( 
gisConfig.getMomBrokerUrl() );
        activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new 
AtomikosConnectionFactoryBean();
        
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }
    
    @Bean
    @DependsOn({ "transactionManager" })
    DefaultMessageListenerContainer queueWrapperGDM() throws 
SystemException {
        DefaultMessageListenerContainer messageSource = new 
DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() 
);
        messageSource.setSessionTransacted(true);
        messageSource.setSessionAcknowledgeMode(0);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( 
gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        messageSource.afterPropertiesSet();
        return messageSource;
    }
    
    @Bean
    @DependsOn({ "transactionManager" })
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( 
gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setSessionAcknowledgeMode(0);
        return jmsTemplate;
    }
 
}

------------------------------------------------------------
package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {
    
    @Autowired
    ConfigurableApplicationContext    context;
    @Autowired
    GisMessageMarshaler               queueMessageMashaler;
    @Autowired
    Kernel                            kernel;
    
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = 
{Throwable.class})
    public void onMessage(Message jmsMessage) {
        
        TextMessage jmsTextMessage = (TextMessage) jmsMessage;
        
        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

        kernel.receiveCreateGenomicTestOrderInGIS( 
(CreateGenomicTestOrderInGIS) gisMessage );

}

------------------------------------------------------------
package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void 
receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

        dslContext.insertInto(table1)
            .set(...)
            .set(...)
        .execute();

        dslContext.insertInto(table2)
            .set(...)
            .set(...)
        .execute();
    }
<snip>
}

------------------------------------------------------------


On Tuesday, September 29, 2015 at 11:33:18 AM UTC-4, Lukas Eder wrote:
>
> Thank you for the update.
>
> I'm afraid that I don't know what the issue could be right now. Have you 
> debugged through the depths of Spring yet, to see if there is perhaps a 
> commit issued after your first insert at some point?
>
> And just to rule out things, are there perhaps any triggers or other 
> database objects that might create such a commit? And also, to be sure: You 
> are sure that your connection pool doesn't provide you with auto-commit 
> connections?
>
> Cheers,
> Lukas
>
>

-- 
You received this message because you are subscribed to the Google Groups "jOOQ 
User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to