[GitHub] [pulsar] startjava edited a comment on the discussion: has MessageListener interface and Transactions DEMO code??
GitHub user startjava edited a comment on the discussion: has MessageListener interface and Transactions DEMO code?? Consumer side transaction,i has not interface demo: public class PulsarConsumerTransaction { public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl) .enableTransaction(true) .build(); String txnTopic1 = "persistent://my-tenant/my-ns/my-txn-topic1"; String txnTopic2 = "persistent://my-tenant/my-ns/my-txn-topic2"; List topicList = new ArrayList<>(); topicList.add(txnTopic1); topicList.add(txnTopic2); Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(1, TimeUnit.SECONDS) .build().get(); Consumer consumer = pulsarClient.newConsumer(AvroSchema.of(User.class)) .topics(topicList) .subscriptionName("consume-txn") .subscriptionType(SubscriptionType.Shared) .subscribe(); //Messages m = consumer.batchReceive(); int i=0; while (true) { Message message = consumer.receive(); try { // Do something with the message System.out.println("Message received: " + message.getValue()); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction); } catch (Exception e) { // Message failed to process, redeliver later consumer.negativeAcknowledge(message); } i++; if (i >= 5) { transaction.commit(); transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS).build().get(); i = 0; } } } } How do I switch to the interface demo code? GitHub link: https://github.com/apache/pulsar/discussions/20184#discussioncomment-5726483 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] startjava edited a comment on the discussion: has MessageListener interface and Transactions DEMO code??
GitHub user startjava edited a comment on the discussion: has MessageListener interface and Transactions DEMO code?? i has not interface demo: public class PulsarConsumerTransaction { public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl) .enableTransaction(true) .build(); String txnTopic1 = "persistent://my-tenant/my-ns/my-txn-topic1"; String txnTopic2 = "persistent://my-tenant/my-ns/my-txn-topic2"; List topicList = new ArrayList<>(); topicList.add(txnTopic1); topicList.add(txnTopic2); Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(1, TimeUnit.SECONDS) .build().get(); Consumer consumer = pulsarClient.newConsumer(AvroSchema.of(User.class)) .topics(topicList) .subscriptionName("consume-txn") .subscriptionType(SubscriptionType.Shared) .subscribe(); //Messages m = consumer.batchReceive(); int i=0; while (true) { Message message = consumer.receive(); try { // Do something with the message System.out.println("Message received: " + message.getValue()); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction); } catch (Exception e) { // Message failed to process, redeliver later consumer.negativeAcknowledge(message); } i++; if (i >= 5) { transaction.commit(); transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS).build().get(); i = 0; } } } } How do I switch to the interface demo code? GitHub link: https://github.com/apache/pulsar/discussions/20184#discussioncomment-5726483 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org