[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218635418 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,153 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); Review comment: I don't understand this logic. here it creates producer only if producer is not present under `publishProducers`. So, does it really need it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606849 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,166 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); + +if (existingProducer != null) { +// The value in the map was not updated after the concurrent put +newProducer.close(); +producer = existingProducer; +} else { +producer = newProducer; +} + } catch (PulsarClientException e) { -log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +log.error("Failed to create Producer while doing user publish", e); +throw new RuntimeException(e); } } +return producer; } -} - -private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { -private Producer producer; @Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +public void close() throws Exception { +List> closeFutures = new ArrayList<>(publishProducers.size()); +for (Map.Entry> entry: publishProducers.entrySet()) { +String
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603818 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,166 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); + +if (existingProducer != null) { +// The value in the map was not updated after the concurrent put +newProducer.close(); +producer = existingProducer; +} else { +producer = newProducer; +} + } catch (PulsarClientException e) { -log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +log.error("Failed to create Producer while doing user publish", e); +throw new RuntimeException(e); } } +return producer; } -} - -private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { -private Producer producer; @Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +public void close() throws Exception { +List> closeFutures = new ArrayList<>(publishProducers.size()); +for (Map.Entry> entry: publishProducers.entrySet()) { +String
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603562 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,166 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); + +if (existingProducer != null) { +// The value in the map was not updated after the concurrent put +newProducer.close(); +producer = existingProducer; +} else { +producer = newProducer; +} + } catch (PulsarClientException e) { -log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +log.error("Failed to create Producer while doing user publish", e); +throw new RuntimeException(e); } } +return producer; } -} - -private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { -private Producer producer; @Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +public void close() throws Exception { +List> closeFutures = new ArrayList<>(publishProducers.size()); +for (Map.Entry> entry: publishProducers.entrySet()) { +String
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218605442 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,166 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); + +if (existingProducer != null) { +// The value in the map was not updated after the concurrent put +newProducer.close(); +producer = existingProducer; +} else { +producer = newProducer; +} + } catch (PulsarClientException e) { -log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +log.error("Failed to create Producer while doing user publish", e); +throw new RuntimeException(e); } } +return producer; } -} - -private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { -private Producer producer; @Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +public void close() throws Exception { +List> closeFutures = new ArrayList<>(publishProducers.size()); +for (Map.Entry> entry: publishProducers.entrySet()) { +String
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218604865 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,166 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); + +if (existingProducer != null) { +// The value in the map was not updated after the concurrent put +newProducer.close(); +producer = existingProducer; +} else { +producer = newProducer; +} + } catch (PulsarClientException e) { -log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +log.error("Failed to create Producer while doing user publish", e); +throw new RuntimeException(e); } } +return producer; } -} - -private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { -private Producer producer; @Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +public void close() throws Exception { +List> closeFutures = new ArrayList<>(publishProducers.size()); +for (Map.Entry> entry: publishProducers.entrySet()) { +String