I'm an idiot.
Turns out the issue was due to a defect in my application logic.
The ActiveMQ component retries a message if the first attempt to process
the message fails with an exception. The transaction created for the first
attempt rolled back correctly. It was the second attempt that succeeded.
The retry succeeded because a database sequence number was incremented by
the application logic during the first attempt, and the second attempt did
not result in a duplicate key violation. After correcting the application
logic defect, since in my application no message is retry-able anyway, I
turned off retry, too.
I apologize for wasting the time of those who read my post.
Along the way, I did make some changes to the implementation. The changes
make certain default values explicit choices. I left those changes in
because I believe they will make it easier for other developers on my team
to understand what's happening more quickly. I also left the JOOQ exception
translation code in place because it would be needed in other circumstances
and appears to be best practice anyway.
I've included the modified code in this post, in case others might find it
useful.
------------------------------------------------------------
package com.sm.gis.gdm;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import
org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;
@SpringBootApplication
@EnableJms
@EnableTransactionManagement(proxyTargetClass=true)
public class GdmServer {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;
/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}
//
-------------------------------------------------------------------------
// Spring bean configurations
//
-------------------------------------------------------------------------
@Bean
GisConfig gisConfig() {
return new GisConfig();
}
@Bean
@DependsOn({ "atomikosUserTransactionManager",
"atomikosUserTransaction", "atomikosJdbcConnectionFactory",
"atomikosJmsConnectionFactory" })
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager( atomikosUserTransactionManager() );
manager.setUserTransaction( atomikosUserTransaction() );
manager.setAllowCustomIsolationLevels(true);
manager.afterPropertiesSet();
return manager;
}
@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws
SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
return manager;
}
@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception
{
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl( gisConfig.getGdbUrl() );
pgXADataSource.setUser( gisConfig.getGdbUser() );
pgXADataSource.setPassword( gisConfig.getGdbPassword() );
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
xaDataSource.setTestQuery("SELECT 1");
xaDataSource.afterPropertiesSet();
return xaDataSource;
}
@Bean
@DependsOn({ "atomikosJdbcConnectionFactory" })
DSLContext dslContext() throws Exception {
DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
jooqConfiguration.set( SQLDialect.POSTGRES_9_4 );
jooqConfiguration.set( atomikosJdbcConnectionFactory() );
jooqConfiguration.set( new DefaultExecuteListenerProvider(new
JooqToSpringExceptionTransformer()) );
DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
return dslContext;
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setRedeliveryDelay(0);
redeliveryPolicy.setUseExponentialBackOff(false);
redeliveryPolicy.setMaximumRedeliveries(0);
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new
ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(
gisConfig.getMomBrokerUrl() );
activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new
AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}
@Bean
@DependsOn({ "transactionManager" })
DefaultMessageListenerContainer queueWrapperGDM() throws
SystemException {
DefaultMessageListenerContainer messageSource = new
DefaultMessageListenerContainer();
messageSource.setTransactionManager( transactionManager() );
messageSource.setConnectionFactory( atomikosJmsConnectionFactory()
);
messageSource.setSessionTransacted(true);
messageSource.setSessionAcknowledgeMode(0);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout(
gisConfig.getMomQueueGdmTimeoutReceive() );
messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
messageSource.setMessageListener( context.getBean("portSIQueue") );
messageSource.afterPropertiesSet();
return messageSource;
}
@Bean
@DependsOn({ "transactionManager" })
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
jmsTemplate.setDefaultDestinationName(
gisConfig.getMomQueueLimsName() );
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setSessionAcknowledgeMode(0);
return jmsTemplate;
}
}
------------------------------------------------------------
package com.sm.gis.gdm.ports;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;
@Component
public class PortSIQueue implements MessageListener {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor =
{Throwable.class})
public void onMessage(Message jmsMessage) {
TextMessage jmsTextMessage = (TextMessage) jmsMessage;
// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}
// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
kernel.receiveCreateGenomicTestOrderInGIS(
(CreateGenomicTestOrderInGIS) gisMessage );
}
------------------------------------------------------------
package com.sm.gis.gdm.kernel;
import org.jooq.DSLContext;
@Component
public class Kernel {
@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;
<snip>
public void
receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {
dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();
dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}
------------------------------------------------------------
On Tuesday, September 29, 2015 at 11:33:18 AM UTC-4, Lukas Eder wrote:
>
> Thank you for the update.
>
> I'm afraid that I don't know what the issue could be right now. Have you
> debugged through the depths of Spring yet, to see if there is perhaps a
> commit issued after your first insert at some point?
>
> And just to rule out things, are there perhaps any triggers or other
> database objects that might create such a commit? And also, to be sure: You
> are sure that your connection pool doesn't provide you with auto-commit
> connections?
>
> Cheers,
> Lukas
>
>
--
You received this message because you are subscribed to the Google Groups "jOOQ
User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.