How about splitting the large message and then produce? On Fri, May 31, 2019 at 3:39 PM wenxuan <[email protected]> wrote:
> Hi Jonathan, > > Thanks for your reply. > > I get mass message to send beyond the limit of one JVM or physical > machine, so I need make more than one producer in the same transaction. > > Since multiple producer can’t share the same transaction id, Is there way > to achieve multiple producer transaction described above. > > Thanks, > Wenxuan > > On 2019/05/31 08:34:14, Jonathan Santilli <[email protected]> wrote: > > Hello Wenxuan, there reason of the Exception, by design the transaction > Id> > > must be unique per producer instance, this is from the Java docs:> > > > > > https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> > > > > > "The purpose of the transactional.id is to enable transaction recovery> > > across multiple sessions of a single producer instance. It would > typically> > > be derived from the shard identifier in a partitioned, stateful,> > > application. As such, it should be unique to each producer instance > running> > > within a partitioned application."> > > > > You must have a reason to instantiate multiple producers, however, have > you> > > try just to instantiate one producer?> > > > > "The producer is *thread safe* and sharing a single producer instance> > > across threads will generally be faster than having multiple > instances."> > > > > Hope that helps.> > > > > > > Cheers,> > > --> > > Jonathan> > > > > > > > > On Fri, May 31, 2019 at 5:47 AM wenxuan <[email protected]> wrote:> > > > > > Hey guys,> > > >> > > > I have problem in the below scenario.> > > >> > > > I hope to run multiple producer instances that send message > concurrently> > > > in the same transaction, and the transaction is committed when all > the> > > > producer send message successfully. Otherwise, if one producer > failed, the> > > > transaction is aborted and no message will be consumed.> > > >> > > > However, when multiple producer share the same txn id, throw the > following> > > > exception:> > > >> > > > org.apache.kafka.common.KafkaException: Cannot execute transactional> > > > method because we are in an error state> > > >> > > > at> > > > > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)> > > > >> > > > at> > > > > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)> > > > >> > > > at> > > > > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)> > > > >> > > > at> > > > > com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)> > > > >> > > > Caused by: org.apache.kafka.common.errors.ProducerFencedException:> > > > Producer attempted an operation with an old epoch. Either there is a > newer> > > > producer with the same transactionalId, or the producer's transaction > has> > > > been expired by the broker.> > > >> > > > Please help us how to solve this, Thanks.> > > >> > > > > > > -- > > > Santilli Jonathan> > > > -- * Regards* * Sandeep Nemuri*
