[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218635418
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,153 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
 
 Review comment:
   I don't understand this logic. here it creates producer only if producer is 
not present under `publishProducers`. So, does it really need it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606849
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603818
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603562
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218605442
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218604865
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String