[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524152 ## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java ## @@ -75,6 +78,16 @@ */ Map getProperties(); +/** + * Return entry at the position. + */ +Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException; Review comment: This can already be done through `replayEntries()` and `asyncReplayEntries()` methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524486 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java ## @@ -330,4 +330,24 @@ * Set subscriptionInitialPosition for the consumer */ ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition); + +/** + * Set maximum number of redelivery. + * Message exceeding the maximum number of redelivery will send to Dead Letter Topic and acknowledged automatic. + * @param maxRedeliveryCount maximum number of redelivery + */ +ConsumerBuilder maxRedeliveryCount(int maxRedeliveryCount); + +/** + * Set name of Dead Letter Topic. + * Before set name of Dead Letter Topic, ensure that maxRedeliveryCount > 0 + * @param deadLetterTopic name of dead letter topic + */ +ConsumerBuilder deadLetterTopic(String deadLetterTopic); + +/** + * Set max un-acked messages per consumer. + * This config should less than broker config, if not, config will not enable. 0 is not limit, default is 0 + */ +ConsumerBuilder maxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer); Review comment: From API perspective it might be good to encapsulate all the DLQ related settings into a single `DeadLetterQueuePolicy` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524714 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -229,6 +229,16 @@ message CommandSubscribe { // Signal wthether the subscription will initialize on latest // or not -- earliest optional InitialPosition initialPosition = 13 [default = Latest]; + // Maximum number of redeliveries. + // Message exceeding the maximum number of redeliveries should send to Dead Letter Topic and acknowledged automatic. + // Enable this feature by set maxRedeliveryCount > 0 + optional int32 maxRedeliveryCount = 14; + // Name of Dead Letter Topic. + // If not set, pulsar broker will generate with topic name and subscription name and suffix with -DLQ + optional string deadLetterTopic = 15; Review comment: I think it's a bit dangerous to leave the option to have a large number of possible topics for which we need to create producer objects inside the broker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client
merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client URL: https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415654048 @bardock From the stats, it seems all partitions have rate 0, and no backlog accumulated. I couldn't see any stuck consumer/subscription in there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce throw new ConsumerBusyException("Subscription reached max consumers limit"); } +deadLetterTopicProducer = newDeadLetterProducer(); + consumerList.add(consumer); consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); consumerSet.add(consumer); } +private ProducerImpl newDeadLetterProducer() throws BrokerServiceException { +if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) { +try { +if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) { +deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName())); Review comment: Seems it is up to application to decide. In this change, application can choose its own name, and only use “-DLQ” when the application doesn’t specify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2439: leak assign schema in MessageImpl constructor
srkukarni closed pull request #2439: leak assign schema in MessageImpl constructor URL: https://github.com/apache/incubator-pulsar/pull/2439 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 5ea8bfef5e..96b68c1af4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -155,6 +155,7 @@ public MessageImpl(String msgId, Map properties, ByteBuf payload this.cnx = null; this.payload = payload; this.properties = Collections.unmodifiableMap(properties); +this.schema = schema; } public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: leak assign schema in MessageImpl constructor (#2439)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 024d2e0 leak assign schema in MessageImpl constructor (#2439) 024d2e0 is described below commit 024d2e0ea3b8e7e3b26f5e658b449a0861f3112d Author: Jia Zhai AuthorDate: Fri Aug 24 11:42:55 2018 +0800 leak assign schema in MessageImpl constructor (#2439) --- .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 5ea8bfe..96b68c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -155,6 +155,7 @@ public class MessageImpl implements Message { this.cnx = null; this.payload = payload; this.properties = Collections.unmodifiableMap(properties); +this.schema = schema; } public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException {
[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce throw new ConsumerBusyException("Subscription reached max consumers limit"); } +deadLetterTopicProducer = newDeadLetterProducer(); + consumerList.add(consumer); consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); consumerSet.add(consumer); } +private ProducerImpl newDeadLetterProducer() throws BrokerServiceException { +if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) { +try { +if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) { +deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName())); Review comment: Seems it is up to application to decide. Application can choose its own name, and only use “-DLQ” when the application doesn’t specify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514391 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce throw new ConsumerBusyException("Subscription reached max consumers limit"); } +deadLetterTopicProducer = newDeadLetterProducer(); + consumerList.add(consumer); consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); consumerSet.add(consumer); } +private ProducerImpl newDeadLetterProducer() throws BrokerServiceException { +if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) { +try { +if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) { +deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName())); Review comment: Any better suggest? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514262 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -88,20 +100,32 @@ private final ServiceConfiguration serviceConfig; private DispatchRateLimiter dispatchRateLimiter; +protected volatile int maxRedeliveryCount; +protected volatile String deadLetterTopic; +protected RedeliveryTracker redeliveryTracker; +private volatile ProducerImpl deadLetterTopicProducer; + enum ReadType { Normal, Replay } -public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) { +public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, int maxRedeliveryCount, + String deadLetterTopic) { this.cursor = cursor; this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); +this.messagesToDeadLetter = new HashSet<>(8); Review comment: Ok! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212513063 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -88,20 +100,32 @@ private final ServiceConfiguration serviceConfig; private DispatchRateLimiter dispatchRateLimiter; +protected volatile int maxRedeliveryCount; +protected volatile String deadLetterTopic; +protected RedeliveryTracker redeliveryTracker; +private volatile ProducerImpl deadLetterTopicProducer; + enum ReadType { Normal, Replay } -public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) { +public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, int maxRedeliveryCount, + String deadLetterTopic) { this.cursor = cursor; this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); +this.messagesToDeadLetter = new HashSet<>(8); Review comment: +1, @codelipenghui, we may need following the way of `messagesToReplay`, which store ledgerId and entryId as primitive value, and then compose a Position from(ledgerId + entryId) when using it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2401: Consumer often discards received messages
massakam commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415634865 Sorry, OpenMessaging benchmark tool has not yet supported TLS... We are using the tool modified to enable TLS. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai opened a new pull request #2439: leak assign schema in MessageImpl constructor
jiazhai opened a new pull request #2439: leak assign schema in MessageImpl constructor URL: https://github.com/apache/incubator-pulsar/pull/2439 schema was not assigned in MessageImpl constructor, this change try to fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia closed pull request #2437: Race condition function-runtime-manager read old assignments
rdhabalia closed pull request #2437: Race condition function-runtime-manager read old assignments URL: https://github.com/apache/incubator-pulsar/pull/2437 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 016324e481..a6fda6f8b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -335,6 +335,10 @@ public String getSubscription() { return subscription; } +public String getConsumerName() { +return this.consumerName; +} + /** * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index 7994ef33d4..b18fd12881 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.proto.Function; @@ -53,7 +54,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { private final String consumerName; -private final Consumer consumer; +private final ConsumerImpl consumer; private final WorkerConfig workerConfig; private PulsarAdmin pulsarAdminClient; private final CompletableFuture firstConsumerEventFuture; @@ -68,9 +69,9 @@ @VisibleForTesting Map unsignedFunctionDurations = new HashMap<>(); -MembershipManager(WorkerConfig workerConfig, PulsarClient client) +MembershipManager(WorkerService service, PulsarClient client) throws PulsarClientException { -this.workerConfig = workerConfig; +this.workerConfig = service.getWorkerConfig(); consumerName = String.format( "%s:%s:%d", workerConfig.getWorkerId(), @@ -82,13 +83,15 @@ // we don't produce any messages into this topic, we only use the `failover` subscription // to elect an active consumer as the leader worker. The leader worker will be responsible // for scheduling snapshots for FMT and doing task assignment. -consumer = client.newConsumer() +consumer = (ConsumerImpl) client.newConsumer() .topic(workerConfig.getClusterCoordinationTopic()) .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) .subscriptionType(SubscriptionType.Failover) .consumerEventListener(this) .property(WORKER_IDENTIFIER, consumerName) .subscribe(); + +isLeader.set(checkLeader(service, consumer.getConsumerName())); } @Override @@ -282,4 +285,19 @@ private PulsarAdmin getPulsarAdminClient() { return this.pulsarAdminClient; } +private boolean checkLeader(WorkerService service, String consumerName) { +try { +TopicStats stats = service.getBrokerAdmin().topics() + .getStats(service.getWorkerConfig().getClusterCoordinationTopic()); +String activeConsumerName = stats != null +&& stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null +? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName +: null; +return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName); +} catch (Exception e) { +log.warn("Failed to check leader {}", e.getMessage()); +} +return false; +} + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0850766e8d..7fc0cc94d7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/wo
[incubator-pulsar] branch master updated: Race condition function-runtime-manager read old assignments (#2437)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 672a167 Race condition function-runtime-manager read old assignments (#2437) 672a167 is described below commit 672a167a31c719dab7623770d39666e8d7e7a0fd Author: Rajan Dhabalia AuthorDate: Thu Aug 23 18:34:44 2018 -0700 Race condition function-runtime-manager read old assignments (#2437) --- .../apache/pulsar/client/impl/ConsumerBase.java| 4 .../pulsar/functions/worker/MembershipManager.java | 26 ++ .../pulsar/functions/worker/WorkerService.java | 2 +- .../functions/worker/MembershipManagerTest.java| 24 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 016324e..a6fda6f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -335,6 +335,10 @@ public abstract class ConsumerBase extends HandlerState implements Consumer consumer; +private final ConsumerImpl consumer; private final WorkerConfig workerConfig; private PulsarAdmin pulsarAdminClient; private final CompletableFuture firstConsumerEventFuture; @@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { @VisibleForTesting Map unsignedFunctionDurations = new HashMap<>(); -MembershipManager(WorkerConfig workerConfig, PulsarClient client) +MembershipManager(WorkerService service, PulsarClient client) throws PulsarClientException { -this.workerConfig = workerConfig; +this.workerConfig = service.getWorkerConfig(); consumerName = String.format( "%s:%s:%d", workerConfig.getWorkerId(), @@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { // we don't produce any messages into this topic, we only use the `failover` subscription // to elect an active consumer as the leader worker. The leader worker will be responsible // for scheduling snapshots for FMT and doing task assignment. -consumer = client.newConsumer() +consumer = (ConsumerImpl) client.newConsumer() .topic(workerConfig.getClusterCoordinationTopic()) .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) .subscriptionType(SubscriptionType.Failover) .consumerEventListener(this) .property(WORKER_IDENTIFIER, consumerName) .subscribe(); + +isLeader.set(checkLeader(service, consumer.getConsumerName())); } @Override @@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { return this.pulsarAdminClient; } +private boolean checkLeader(WorkerService service, String consumerName) { +try { +TopicStats stats = service.getBrokerAdmin().topics() + .getStats(service.getWorkerConfig().getClusterCoordinationTopic()); +String activeConsumerName = stats != null +&& stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null +? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName +: null; +return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName); +} catch (Exception e) { +log.warn("Failed to check leader {}", e.getMessage()); +} +return false; +} + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0850766..7fc0cc9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -137,7 +137,7 @@ public class WorkerService { this.connectorsManager = new ConnectorsManager(workerConfig); //create membership manager -this.membershipManager = new MembershipManager(this.workerConfig, this.client); +this.membershipManager = new MembershipManager(this, this.client); // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsa
[GitHub] jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics
jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415621190 @rdhabalia it would be nice to keep all the functions related topics in one namespace, but i guess its not that big of a deal This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia closed pull request #2436: Fix: failover sub keep sending invalid ack
rdhabalia closed pull request #2436: Fix: failover sub keep sending invalid ack URL: https://github.com/apache/incubator-pulsar/pull/2436 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0fefe9c182..5240bfe8ec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -164,6 +164,8 @@ public void flush() { ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, AckType.Cumulative, null, Collections.emptyMap()); cnx.ctx().write(cmd, cnx.ctx().voidPromise()); +lastCumulativeAck = (MessageIdImpl) MessageId.earliest; +return; } // Flush all individual acks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic
rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212497198 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -88,20 +100,32 @@ private final ServiceConfiguration serviceConfig; private DispatchRateLimiter dispatchRateLimiter; +protected volatile int maxRedeliveryCount; +protected volatile String deadLetterTopic; +protected RedeliveryTracker redeliveryTracker; +private volatile ProducerImpl deadLetterTopicProducer; + enum ReadType { Normal, Replay } -public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) { +public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, int maxRedeliveryCount, + String deadLetterTopic) { this.cursor = cursor; this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); +this.messagesToDeadLetter = new HashSet<>(8); Review comment: umm.. this can cause in high gc for broker. Essentially we would like to avoid storing objects in heap with relatively some what long life-cycle. therefore, in past release we had an effort to clean up storing PositionImp objects from code base. so, for broker serving high throughput with low latency requirement might want to having this feature. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic
rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212497283 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce throw new ConsumerBusyException("Subscription reached max consumers limit"); } +deadLetterTopicProducer = newDeadLetterProducer(); + consumerList.add(consumer); consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); consumerSet.add(consumer); } +private ProducerImpl newDeadLetterProducer() throws BrokerServiceException { +if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) { +try { +if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) { +deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName())); Review comment: what if tenant already has topic with postfix `DLQ`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics
rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415615536 > we can also use topic compaction to delete old assignment messages yes, with that we can address performance issue but it will not address scalability issue because we can't publish >5MB assignment-message with 2500-functions as per above example. This PR has a simple and straight forward approach to ack previous message and we can avoid additional hook for compaction. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics
jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415613914 @rdhabalia we can also use topic compaction to delete old assignment messages This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments
jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415613741 @rdhabalia gotcha thanks for explanation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2437: Race condition function-runtime-manager read old assignments
rdhabalia commented on issue #2437: Race condition function-runtime-manager read old assignments URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415612533 > In the scheduleManager, we have a check before invoking actual scheduling: yes, exactly [SchedulerManager](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L93) tries to schedule only if it is a leader and at that time MemberShipManager has not received that notification of leader so, schedulerManager skips scheduling and RuntimeManager reads last available assignment. I am able to reproduce it in standalone function worker where standalone-scheduler-manager skips scheduling as it finds `isLeader()` false. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information
srkukarni commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415611753 @rdhabalia Currently /workers doesn't redirect either. /functions might. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments
jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415611324 @rdhabalia I am a little confused on how the schedule manager can schedule something before membership manager receives notification that it is the leader. In the scheduleManager, we have a check before invoking actual scheduling: https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L93 Am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2426: Added cli commands to get function cluster related information
rdhabalia commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415610192 > wrt having a seperate endpoint, I'm not sure if its really needs to go in /worker-stats? actually, motivation behind having separate endpoint `workers` and `worker-stats` is `worker-stats` will always serve localhost stats and this resource can't redirect or serve for other workers. and that was the reason we have broker-stats to serve broker's local resources. so, I thought we can keep that consistency. @merlimat any thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia opened a new pull request #2438: Fix: Function assignment can support large number of topics
rdhabalia opened a new pull request #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438 ### Motivation: Pulsar function assignment has scalability and performance issue. Right now, [SchedulerManager](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L154) publishes all assignments into one pulsar message. Here, each assignment generates 700 bytes of payload. Pulsar has limitation with pulsar messages-size which is around 5MB. Now, if each function is running with 3 instances then it requires 2KB of payload so, function can only support around 2500 functions in the cluster. Also, assignment event is something that happens more frequent in the system which can be triggered on any assignment change or worker restart. So, over period of time, we can expect large number of assignment messages stored across many ledgers in the system and every time worker restart, it requires to read all those very old ledgers from BK which is something we would also definitely like to avoid. **Note:** We can easily reproduce it by registering function with parallelism=12000 which will fail to publish assignment message. ### Modification 1. Publish multiple messages (each message with limited number of assignments) to include all assignments to support any number of function assignments 2. Acks the message for old version of assignments (which requires separate namespace for assignment which won't have infinite retention configured) 3. Broker deletes ledgers for old assignments and assignment-reader doesn't have to read such ledgers. ### Result 1. Pulsar function can support any number of functions in the system 2. Assignment-manager doesn't read old assignments so, broker and bookie can avoid unnecessary read and dispatching This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2435: improve error message handling
srkukarni commented on issue #2435: improve error message handling URL: https://github.com/apache/incubator-pulsar/pull/2435#issuecomment-415607320 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase
srkukarni closed pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase URL: https://github.com/apache/incubator-pulsar/pull/2432 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 1019853c9a..f31fd82468 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -277,7 +277,9 @@ void processArguments() throws Exception { protected String DEPRECATED_userConfigString; @Parameter(names = "--user-config", description = "User-defined config key/values") protected String userConfigString; -@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order") +@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order", hidden = true) +protected Boolean DEPRECATED_retainOrdering; +@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @@ -329,8 +331,9 @@ private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString; if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile; -if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees; +if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString; +if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering; if (DEPRECATED_windowLengthCount != null) windowLengthCount = DEPRECATED_windowLengthCount; if (DEPRECATED_windowLengthDurationMs != null) windowLengthDurationMs = DEPRECATED_windowLengthDurationMs; if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount = DEPRECATED_slidingIntervalCount; @@ -825,6 +828,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, @@ -1023,6 +1027,7 @@ public void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (triggerFile == null && triggerValue == null) { throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); @@ -1057,6 +1062,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(sourceFile)) { throw new ParameterException("--source-file needs to be specified"); @@ -1091,6 +1097,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(destinationFile)) { throw new ParameterException("--destination-file needs to be specified"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 52f30e64d5..5259996ca7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -118,29 +118,56 @@ void processArguments() throws Exception { @Parameters(commandDescription = "Run a Pulsar IO sink connector locally (rather than deploying it to the Pulsar cluster)") protected class LocalSinkRunner extends CreateSink { -@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") +
[incubator-pulsar] branch master updated: Make all cli arguments follow consistent - notation instead of camelcase (#2432)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 225eeb7 Make all cli arguments follow consistent - notation instead of camelcase (#2432) 225eeb7 is described below commit 225eeb70ed6e2f88783997a891fec642d1b7baac Author: Sanjeev Kulkarni AuthorDate: Thu Aug 23 16:28:21 2018 -0700 Make all cli arguments follow consistent - notation instead of camelcase (#2432) --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 11 ++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 87 ++ .../org/apache/pulsar/admin/cli/CmdSources.java| 77 +++ 3 files changed, 145 insertions(+), 30 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 1019853..f31fd82 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -277,7 +277,9 @@ public class CmdFunctions extends CmdBase { protected String DEPRECATED_userConfigString; @Parameter(names = "--user-config", description = "User-defined config key/values") protected String userConfigString; -@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order") +@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order", hidden = true) +protected Boolean DEPRECATED_retainOrdering; +@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @@ -329,8 +331,9 @@ public class CmdFunctions extends CmdBase { if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString; if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile; -if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees; +if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString; +if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering; if (DEPRECATED_windowLengthCount != null) windowLengthCount = DEPRECATED_windowLengthCount; if (DEPRECATED_windowLengthDurationMs != null) windowLengthDurationMs = DEPRECATED_windowLengthDurationMs; if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount = DEPRECATED_slidingIntervalCount; @@ -825,6 +828,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, @@ -1023,6 +1027,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (triggerFile == null && triggerValue == null) { throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); @@ -1057,6 +1062,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(sourceFile)) { throw new ParameterException("--source-file needs to be specified"); @@ -1091,6 +1097,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(destinationFile)) { throw new ParameterException("--destination-file needs to be specified"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 52f30e6..5259996 100644 --- a/pulsar-client-tools/src/mai
[GitHub] rdhabalia opened a new pull request #2437: Race condition function-runtime-manager read old assignments
rdhabalia opened a new pull request #2437: Race condition function-runtime-manager read old assignments URL: https://github.com/apache/incubator-pulsar/pull/2437 ### Motivation Right now, In function, schedulerManager tries to schedule assignments before MemebershipManager receives leader notification from broker. So, leader-worker's schedulerManager failed to assign new assignment and RuntimeManager read old assignment message and applies old assignment which might not be valid. ### Modifications Fix race condition by setting correct leader at `MemebershipManager` before function tries to schedule assignments. ### Result Function worker will not apply old assignments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Made Twitter FireHose to emit records containing schema information (#2433)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d6a0ce0 Made Twitter FireHose to emit records containing schema information (#2433) d6a0ce0 is described below commit d6a0ce092fd3f3d09ecfbe81d26b71d7b94b03e2 Author: Sanjeev Kulkarni AuthorDate: Thu Aug 23 16:13:10 2018 -0700 Made Twitter FireHose to emit records containing schema information (#2433) --- .../org/apache/pulsar/io/twitter/TweetData.java| 121 + .../apache/pulsar/io/twitter/TwitterFireHose.java | 21 ++-- 2 files changed, 134 insertions(+), 8 deletions(-) diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java new file mode 100644 index 000..3e5503d --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.twitter; + +import java.util.List; +import lombok.Data; + +@Data +public class TweetData { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private RetweetedStatus retweetedStatus; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +private String timestampMs; +private Delete delete; + +@Data +public static class User { +private Long id; +private String idStr; +private String name; +private String screenName; +private String location; +private String description; +private String translatorType; +private Boolean _protected; +private Boolean verified; +private Long followersCount; +private Long friendsCount; +private Long listedCount; +private Long favouritesCount; +private Long statusesCount; +private String createdAt; +private Boolean geoEnabled; +private String lang; +private Boolean contributorsEnabled; +private Boolean isTranslator; +private String profileBackgroundColor; +private String profileBackgroundImageUrl; +private String profileBackgroundImageUrlHttps; +private Boolean profileBackgroundTile; +private String profileLinkColor; +private String profileSidebarBorderColor; +private String profileSidebarFillColor; +private String profileTextColor; +private Boolean profileUseBackgroundImage; +private String profileImageUrl; +private String profileImageUrlHttps; +private String profileBannerUrl; +private Boolean defaultProfile; +private Boolean defaultProfileImage; +} +@Data +public static class Url { +private String url; +private String expandedUrl; +private String displayUrl; +private List indices = null; +} +@Data +public static class RetweetedStatus { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +} +@Data +public static class Status { +private Long id; +private String idStr; +private Long userId; +private String userIdStr; +} +@Data +p
[GitHub] srkukarni closed pull request #2433: Made Twitter FireHose to emit records containing schema information
srkukarni closed pull request #2433: Made Twitter FireHose to emit records containing schema information URL: https://github.com/apache/incubator-pulsar/pull/2433 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java new file mode 100644 index 00..3e5503d8cc --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.twitter; + +import java.util.List; +import lombok.Data; + +@Data +public class TweetData { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private RetweetedStatus retweetedStatus; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +private String timestampMs; +private Delete delete; + +@Data +public static class User { +private Long id; +private String idStr; +private String name; +private String screenName; +private String location; +private String description; +private String translatorType; +private Boolean _protected; +private Boolean verified; +private Long followersCount; +private Long friendsCount; +private Long listedCount; +private Long favouritesCount; +private Long statusesCount; +private String createdAt; +private Boolean geoEnabled; +private String lang; +private Boolean contributorsEnabled; +private Boolean isTranslator; +private String profileBackgroundColor; +private String profileBackgroundImageUrl; +private String profileBackgroundImageUrlHttps; +private Boolean profileBackgroundTile; +private String profileLinkColor; +private String profileSidebarBorderColor; +private String profileSidebarFillColor; +private String profileTextColor; +private Boolean profileUseBackgroundImage; +private String profileImageUrl; +private String profileImageUrlHttps; +private String profileBannerUrl; +private Boolean defaultProfile; +private Boolean defaultProfileImage; +} +@Data +public static class Url { +private String url; +private String expandedUrl; +private String displayUrl; +private List indices = null; +} +@Data +public static class RetweetedStatus { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +} +@Data +public static class Status { +private Long id; +private String idStr; +private Long userId; +private String userIdStr; +} +@Data +public static class Delete { +private Status status; +private String timestampMs; +} +} diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 76ea4ac250..2631db1796 100644 ---
[GitHub] rdhabalia opened a new pull request #2436: Fix: failover sub keep sending invalid ack
rdhabalia opened a new pull request #2436: Fix: failover sub keep sending invalid ack URL: https://github.com/apache/incubator-pulsar/pull/2436 ### Motivation Consumer with FailOver-Subscription is not reseting `lastCumulativeAck` so, ack-tracker task keeps sending invalid ack to broker. ### Modifications - Reset `lastCumulativeAck` and return to avoid sending duplicate ack. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #2435: improve error message handling
jerrypeng opened a new pull request #2435: improve error message handling URL: https://github.com/apache/incubator-pulsar/pull/2435 ### Motivation Improve error message handling in presto pulsar connector This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase
srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase URL: https://github.com/apache/incubator-pulsar/pull/2432#issuecomment-415585124 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se opened a new pull request #2434: Add pulsar flink connectors
aahmed-se opened a new pull request #2434: Add pulsar flink connectors URL: https://github.com/apache/incubator-pulsar/pull/2434 We Introduce a module for simple pulsar connectors for flink. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2317: Make twitter source to emit records with schema info
srkukarni closed pull request #2317: Make twitter source to emit records with schema info URL: https://github.com/apache/incubator-pulsar/pull/2317 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java new file mode 100644 index 00..fab936e3a2 --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter; + +import java.util.List; + +import lombok.Data; + +@Data +public class TweetData { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private RetweetedStatus retweetedStatus; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +private String timestampMs; + +private Delete delete; + +@Data +public static class User { +private Long id; +private String idStr; +private String name; +private String screenName; +private String location; +private String description; +private String translatorType; +private Boolean _protected; +private Boolean verified; +private Long followersCount; +private Long friendsCount; +private Long listedCount; +private Long favouritesCount; +private Long statusesCount; +private String createdAt; +private Boolean geoEnabled; +private String lang; +private Boolean contributorsEnabled; +private Boolean isTranslator; +private String profileBackgroundColor; +private String profileBackgroundImageUrl; +private String profileBackgroundImageUrlHttps; +private Boolean profileBackgroundTile; +private String profileLinkColor; +private String profileSidebarBorderColor; +private String profileSidebarFillColor; +private String profileTextColor; +private Boolean profileUseBackgroundImage; +private String profileImageUrl; +private String profileImageUrlHttps; +private String profileBannerUrl; +private Boolean defaultProfile; +private Boolean defaultProfileImage; +} + +@Data +public static class Url { +private String url; +private String expandedUrl; +private String displayUrl; +private List indices = null; +} + +@Data +public static class RetweetedStatus { +private String createdAt; +private Long id; +private String idStr; +private String text; +private String source; +private Boolean truncated; +private User user; +private Boolean isQuoteStatus; +private Long quoteCount; +private Long replyCount; +private Long retweetCount; +private Long favoriteCount; +private Boolean favorited; +private Boolean retweeted; +private String filterLevel; +private String lang; +} + +@Data +public static class Status { +private Long id; +private String idStr; +private Long userId; +private String userIdStr; +} + +@Data +public static class Delete { +private Status status; +private String timestampMs; +} +} diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 76ea4ac250..eef9077427 100644 --- a/pu
[GitHub] srkukarni commented on issue #2317: Make twitter source to emit records with schema info
srkukarni commented on issue #2317: Make twitter source to emit records with schema info URL: https://github.com/apache/incubator-pulsar/pull/2317#issuecomment-415577626 closing in favor of https://github.com/apache/incubator-pulsar/pull/2433 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2433: Made Twitter FireHose to emit records containing schema information
srkukarni opened a new pull request #2433: Made Twitter FireHose to emit records containing schema information URL: https://github.com/apache/incubator-pulsar/pull/2433 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455850 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java ## @@ -0,0 +1,35 @@ +package org.apache.pulsar.tests.integration.containers; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455767 ## File path: pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java ## @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class HdfsSinkConfigTests { + + @Rule +public ExpectedException thrown = ExpectedException.none(); + + @Test + public final void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); Review comment: Fixed and added "contrib-check" profile to main pom.xml file to validate check style compliance This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455521 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + + public static final String BZIP2 = "BZip2"; + public static final String DEFLATE = "Deflate"; + public static final String GZIP = "Gzip"; + public static final String LZ4 = "Lz4"; + public static final String SNAPPY = "Snappy"; + + private static final long serialVersionUID = 1L; + + /** +* A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop + * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration. +*/ + protected String hdfsConfigResources; + + /** +* The HDFS directory from which files should be read from or written to +*/ + protected String directory; + + /** +* The character encoding for the files, e.g. UTF-8, ASCII, etc. +*/ + protected String encoding; + + /** +* The compression codec used to compress/de-compress the files on HDFS. +*/ + protected String compression; + + /** +* The Kerberos user principal account to use for authentication +*/ + protected String kerberosUserPrincipal; + + /** +* The full pathname to the Kerberos keytab file to use for authentication. +*/ + protected String keytab; + + public void validate() { + if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory) ) + throw new IllegalArgumentException("Required property not set."); + + if (StringUtils.isNotEmpty(compression)) { + if (!Stream.of(BZIP2, DEFLATE, GZIP, LZ4, SNAPPY).anyMatch(compression::equalsIgnoreCase)) { + throw new IllegalArgumentException("Invalid Compression code specified. Valid values are 'BZip2', 'Deflate', 'Gzip', 'Lz4', or 'Snappy'"); + } + } + + if ( (StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab)) || +(StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab)) ) { + throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required."); + } + } + +public CompressionCodec getCompressionCodec() { + if (StringUtils.isBlank(compression)) + return null; + + if (compression.equalsIgnoreCase(BZIP2)) + return new BZip2Codec(); + + if (compression.equalsIgnoreCase(DEFLATE)) + return new DeflateCodec(); + + if (compression.equalsIgnoreCase(GZIP)) + return new GzipCodec(); + + if (compression.equalsIgnoreCase(LZ4)) + return new Lz4Codec(); + + if (compression.equalsIgnoreCase(SNAPPY)) + return new SnappyCodec(); + + return null; +} + +
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455382 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + + public static final String BZIP2 = "BZip2"; + public static final String DEFLATE = "Deflate"; + public static final String GZIP = "Gzip"; + public static final String LZ4 = "Lz4"; + public static final String SNAPPY = "Snappy"; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455416 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + + public static final String BZIP2 = "BZip2"; + public static final String DEFLATE = "Deflate"; + public static final String GZIP = "Gzip"; + public static final String LZ4 = "Lz4"; + public static final String SNAPPY = "Snappy"; + + private static final long serialVersionUID = 1L; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455487 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + + public static final String BZIP2 = "BZip2"; + public static final String DEFLATE = "Deflate"; + public static final String GZIP = "Gzip"; + public static final String LZ4 = "Lz4"; + public static final String SNAPPY = "Snappy"; + + private static final long serialVersionUID = 1L; + + /** +* A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop + * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration. +*/ + protected String hdfsConfigResources; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information
srkukarni commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415564305 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase
srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase URL: https://github.com/apache/incubator-pulsar/pull/2432#issuecomment-415564129 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information
srkukarni commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415541657 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2381: Make source and sink cli args format consistent
srkukarni commented on issue #2381: Make source and sink cli args format consistent URL: https://github.com/apache/incubator-pulsar/pull/2381#issuecomment-415539137 This is superseded by https://github.com/apache/incubator-pulsar/pull/2432 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2381: Make source and sink cli args format consistent
srkukarni closed pull request #2381: Make source and sink cli args format consistent URL: https://github.com/apache/incubator-pulsar/pull/2381 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 5f52313d57..1a7d1e96e1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -268,7 +268,10 @@ void processArguments() throws Exception { protected String DEPRECATED_userConfigString; @Parameter(names = "--user-config", description = "User-defined config key/values") protected String userConfigString; -@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order") +// for backwards compatibility purposes +@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order", hidden = true) +protected Boolean DEPRECATED_retainOrdering; +@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @@ -319,8 +322,9 @@ private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) outputSerdeClassName = DEPRECATED_outputSerdeClassName; if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString; if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile; -if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees; +if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString; +if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering; if (DEPRECATED_windowLengthCount != null) windowLengthCount = DEPRECATED_windowLengthCount; if (DEPRECATED_windowLengthDurationMs != null) windowLengthDurationMs = DEPRECATED_windowLengthDurationMs; if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount = DEPRECATED_slidingIntervalCount; @@ -787,6 +791,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, @@ -964,6 +969,7 @@ public void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (triggerFile == null && triggerValue == null) { throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); @@ -998,6 +1004,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(sourceFile)) { throw new ParameterException("--source-file needs to be specified"); @@ -1032,6 +1039,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { +// merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(destinationFile)) { throw new ParameterException("--destination-file needs to be specified"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 8417874f38..5ccd9b0853 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -24,23 +24,9 @@ import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; - -import java.io.File; -import java.io.FileOutputStream; -imp
[GitHub] srkukarni opened a new pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase
srkukarni opened a new pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase URL: https://github.com/apache/incubator-pulsar/pull/2432 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration
jerrypeng commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212421574 ## File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties ## @@ -0,0 +1,5 @@ +connector.name=pulsar Review comment: good idea! I was just trying to keep the original presto project structure This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Support heartbeat function for worker (#2424)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 31edf5a Support heartbeat function for worker (#2424) 31edf5a is described below commit 31edf5aa74f3c4005a228ef463df05ecb2cee287 Author: Rajan Dhabalia AuthorDate: Thu Aug 23 11:20:15 2018 -0700 Support heartbeat function for worker (#2424) --- .../pulsar/functions/worker/WorkerConfig.java | 2 +- .../worker/scheduler/RoundRobinScheduler.java | 17 ++- .../functions/worker/SchedulerManagerTest.java | 57 ++ 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 38ef5d3..0f695a9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -143,7 +143,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { -this.workerId = getWorkerHostname(); +this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); } return this.workerId; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 4f9ad62..58c1a9a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.scheduler; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; import java.util.HashMap; @@ -29,6 +30,9 @@ import java.util.stream.Collectors; public class RoundRobinScheduler implements IScheduler { +public static final String HEARTBEAT_TENANT = "pulsar-function"; +public static final String HEARTBEAT_NAMESPACE = "heartbeat"; + @Override public List schedule(List unassignedFunctionInstances, List currentAssignments, List workers) { @@ -44,7 +48,8 @@ public class RoundRobinScheduler implements IScheduler { } for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { -String workerId = findNextWorker(workerIdToAssignment); +String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); +String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); @@ -57,6 +62,16 @@ public class RoundRobinScheduler implements IScheduler { return assignments; } +private static String checkHeartBeatFunction(Instance funInstance) { +if (funInstance.getFunctionMetaData() != null +&& funInstance.getFunctionMetaData().getFunctionDetails() != null) { +FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); +return HEARTBEAT_TENANT.equals(funDetails.getTenant()) +&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; +} +return null; +} + private String findNextWorker(Map> workerIdToAssignment) { String targetWorkerId = null; int least = Integer.MAX_VALUE; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 97e9c36..19977bd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; import org
[GitHub] rdhabalia closed pull request #2424: Support heartbeat function for worker
rdhabalia closed pull request #2424: Support heartbeat function for worker URL: https://github.com/apache/incubator-pulsar/pull/2424 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 38ef5d3a6c..0f695a974f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws IOException { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { -this.workerId = getWorkerHostname(); +this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); } return this.workerId; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 4f9ad62b30..58c1a9a513 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.scheduler; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; import java.util.HashMap; @@ -29,6 +30,9 @@ public class RoundRobinScheduler implements IScheduler { +public static final String HEARTBEAT_TENANT = "pulsar-function"; +public static final String HEARTBEAT_NAMESPACE = "heartbeat"; + @Override public List schedule(List unassignedFunctionInstances, List currentAssignments, List workers) { @@ -44,7 +48,8 @@ } for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { -String workerId = findNextWorker(workerIdToAssignment); +String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); +String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); @@ -57,6 +62,16 @@ return assignments; } +private static String checkHeartBeatFunction(Instance funInstance) { +if (funInstance.getFunctionMetaData() != null +&& funInstance.getFunctionMetaData().getFunctionDetails() != null) { +FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); +return HEARTBEAT_TENANT.equals(funDetails.getTenant()) +&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; +} +return null; +} + private String findNextWorker(Map> workerIdToAssignment) { String targetWorkerId = null; int least = Integer.MAX_VALUE; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 97e9c365a5..19977bd812 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; @@ -554,6 +555,62 @@ public void testScalingDown() throws Exception { ); } +@Test +public void testHeartbeatFunction() throws Exception { +List functionMetaDataList = new LinkedList<>(); +final long version = 5; +final String workerId1 = "host-workerId-1"; +final String workerId2 = "host-workerId-2"; +Function.FunctionMetaData function1 = Fu
[GitHub] rdhabalia commented on a change in pull request #2424: Support heartbeat function for worker
rdhabalia commented on a change in pull request #2424: Support heartbeat function for worker URL: https://github.com/apache/incubator-pulsar/pull/2424#discussion_r212406508 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java ## @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws IOException { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { -this.workerId = getWorkerHostname(); +this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); Review comment: it was a bug actually, if multiple worker instances start on the same host but different port then it doesn't create unique workerId and it was resulting into incorrect assignment. Therefore, it is part of bug-fix also. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bardock commented on issue #2431: Unable to consume messages, need to re-subscribe client
bardock commented on issue #2431: Unable to consume messages, need to re-subscribe client URL: https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415509377 Here they are: [stats.zip](https://github.com/apache/incubator-pulsar/files/2315609/stats.zip) Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information
srkukarni commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415505950 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2424: Support heartbeat function for worker
srkukarni commented on a change in pull request #2424: Support heartbeat function for worker URL: https://github.com/apache/incubator-pulsar/pull/2424#discussion_r212389886 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java ## @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws IOException { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { -this.workerId = getWorkerHostname(); +this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); Review comment: This change has implications for other places where both workerId and workerport are being used. Maybe add a seperate function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client
merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client URL: https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415497842 @bardock Can you shared topics stats : `pulsar-admin persistent stats $TOPIC` and internal stats `pulsar-admin persistent stats-internal $TOPIC` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration
sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212386472 ## File path: bin/pulsar ## @@ -99,6 +103,24 @@ if [ ! -f "${PY_INSTANCE_FILE}" ]; then PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} fi +# find pulsar sql presto distribution location +if [ ! -d "${PRESTO_HOME}" ]; then + +# BUILT_PRESTO_HOME="${SQL_HOME}/presto-server/target/presto-server-0.206" Review comment: remove this line This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration
sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212387727 ## File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties ## @@ -0,0 +1,5 @@ +connector.name=pulsar Review comment: any reason why we can't place /etc/catalog/pulsar.properties to conf/presto/catalog/pulsar.properties? then we have a central place for managing all configuration files for pulsar components. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration
sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212387884 ## File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties ## @@ -0,0 +1,5 @@ +connector.name=pulsar Review comment: basically `/etc` -> `conf/presto` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bardock opened a new issue #2431: Unable to consume messages, need to re-subscribe client
bardock opened a new issue #2431: Unable to consume messages, need to re-subscribe client URL: https://github.com/apache/incubator-pulsar/issues/2431 This is similar to issue [2013](https://github.com/apache/incubator-pulsar/issues/2013). We have several shared subscriptions of partitioned topics and we found some cases where a subscription's client is not receiving messages, but it's not related to a specific topic partition. In this case, if we re-subscribe the client (`PulsarClient.subscribeAsync`), messages start to be delivered as expected. Client logs this warn: ``` [log_time:16:17:42.377] [thread:pulsar-timer-6-1] [level:WARN ] [logger:UnAckedMessageTracker] - [ConsumerBase{subscription='picking-consumer-prod.fbm-wms-picking', consumerName='i-08e97d991d75e9d07-10.73.125.156', topic='persistent://fury/global/wms-wave-feed-prod.fbm-wms-picking/wms-wave-feed-prod.fbm-wms-picking'}] 1 messages have timed-out ``` And we don't see any error from brokers. Any idea? Any log or metric that may be useful to debug this problem? System configuration **Pulsar version (brokers)**: 1.20.0 **Pulsar clients version**: 1.22.0 **Bookkeeper version**: 4.7.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages
ivankelly commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415495334 I have a local repro, will look into it more in the morning. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bardock commented on issue #2013: Unable to consume messages from a partition
bardock commented on issue #2013: Unable to consume messages from a partition URL: https://github.com/apache/incubator-pulsar/issues/2013#issuecomment-415487704 @sijie any thoughts? We are now using version 4.7.1 and we are tracking almost every bookkeeper metric. What metrics do you think will be useful to debug this issue? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information
srkukarni commented on issue #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415484255 @rdhabalia I've corrected the mistake of taking out functionsmetrics. wrt having a seperate endpoint, I'm not sure if its really needs to go in /worker-stats? /worker/metrics and /worker/functionsmetrics should do the same job? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information
srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#discussion_r212377386 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java ## @@ -37,7 +36,7 @@ private Resources() { return new HashSet<>( Arrays.asList( FunctionApiV2Resource.class, -WorkerStats.class, +WorkerApiV2Resource.class, Review comment: /worker is strictly v2. I see no point in adding a v1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information
srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information URL: https://github.com/apache/incubator-pulsar/pull/2426#discussion_r212377268 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java ## @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.admin.v2; - -import java.io.IOException; -import java.util.Collection; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.core.Response; - -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; - -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Path("/worker-stats") -public class WorkerStats extends FunctionApiResource { - -@GET -@Path("/functions") Review comment: An oversight on my part while moving. I've added it but moved to /worker/functionsmetrics to better reflect what the call does This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2401: Consumer often discards received messages
massakam commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415382210 @ivankelly > Do you have a reliable repro, or do you just see it happening regularly in production? In our environment, it can be reproduced by using the [OpenMessaging benchmark tool](https://github.com/openmessaging/openmessaging-benchmark) and applying the following load: ``` topics: 1 partitionsPerTopic: 1 messageSize: 1024 payloadFile: "payload/payload-1Kb.data" subscriptionsPerTopic: 100 producersPerTopic: 1 producerRate: 1000 consumerBacklogSizeGB: 0 testDurationMinutes: 2 ``` It seems that it tends to occur when there are many subscriptions. It also occurred several times in production. > Is TLS auth enabled, or just TLS transport? We are not using TLS auth. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages
ivankelly commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415368815 What rate are messages arriving at the topic in question? Is the client processing messages for just this topic, or other topics also? I'm trying to get a feel for how we could repro this locally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages
ivankelly commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415368282 Looks like a bytebuf issue. When reading a bytebuf, we often do things like reading the size from the packet and setting the writeIndex to the current writeIndex + size, so that it can be passed into the protobuf parser. This is why the write index seems so messed up. Do you have a reliable repro, or do you just see it happening regularly in production? Is TLS auth enabled, or just TLS transport? this is going to be a bytebuf pooling issue of some sort with TLs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format
sijie commented on issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format URL: https://github.com/apache/incubator-pulsar/issues/2430#issuecomment-415353663 the problem is namespace admin commands don't distinguish v1 and v2 format. so set-backlog-quotas is setting the quota in a wrong namespace. since the v2 topic name is limited to (tenant)/(namespace)/(topic), we should be able to distinguish v1 and v2 namespace name in admin commands. if it has only 2 components `(tenant)/(topic)`, it is a v2 namespace. we can simply disallow v1 namespace name in namespace admin commands. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format
sijie opened a new issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format URL: https://github.com/apache/incubator-pulsar/issues/2430 Expected behavior - setup a 2.0.0 cluster - enable backlog : set-backlog-quotas test-tenant/test-cluster/ns1 --limit 500G --policy producer_exception - run perf: pulsar-perf produce persistent://test-tenant/test-cluster/ns1/test-topic - pulsar perf should be able to produce 500G data Actual behavior - pulsar perf encounters backlog exception when producing around ~10GB data Steps to reproduce as described in "expected behavior" System configuration **Pulsar version**: 2.0.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed issue #2429: delete inactive topics should not delete topics that have retention set
sijie closed issue #2429: delete inactive topics should not delete topics that have retention set URL: https://github.com/apache/incubator-pulsar/issues/2429 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2429: delete inactive topics should not delete topics that have retention set
sijie commented on issue #2429: delete inactive topics should not delete topics that have retention set URL: https://github.com/apache/incubator-pulsar/issues/2429#issuecomment-415351488 sorry I think this is a false alarm. closing the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services