[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890242 ## File path: pulsar/impl_partition_consumer.go ## @@ -650,27 +682,31 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header Consumer: pc, } + pc.log.Debugf("receive message form broker, payload is:%s", string(payload)) + + fmt.Printf("From broker receive msg: %s , consumerID:{%d}\n", string(payload), pc.consumerID) Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890214 ## File path: pulsar/impl_partition_consumer.go ## @@ -610,12 +639,15 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error { } requestID := pc.client.rpcClient.NewRequestID() + pc.log.Debugf("send flow cmd to consumer: [%d], permits size: [%d]", pc.consumerID, permits) _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_FLOW, &pb.CommandFlow{ ConsumerId: proto.Uint64(pc.consumerID), MessagePermits: proto.Uint32(permits), }) + fmt.Printf("Send flow cmd to broker, consumerID: %d, permits: %d \n", pc.consumerID, permits) Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890374 ## File path: pulsar/impl_partition_consumer.go ## @@ -650,27 +682,31 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header Consumer: pc, } + pc.log.Debugf("receive message form broker, payload is:%s", string(payload)) + + fmt.Printf("From broker receive msg: %s , consumerID:{%d}\n", string(payload), pc.consumerID) select { case pc.subQueue <- consumerMsg: - //Add messageId to redeliverMessages buffer, avoiding duplicates. - newMid := response.GetMessageId() - var dup bool - - pc.omu.Lock() - for _, mid := range pc.redeliverMessages { - if proto.Equal(mid, newMid) { - dup = true - break - } - } - - if !dup { - pc.redeliverMessages = append(pc.redeliverMessages, newMid) - } - pc.omu.Unlock() - continue - default: - return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel)) + fmt.Printf("sub queue size is: %d, consumerID: {%d}\n", len(pc.subQueue), pc.consumerID) + return nil + //default: Review comment: remove extra code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890187 ## File path: pulsar/impl_partition_consumer.go ## @@ -312,23 +311,53 @@ func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err } func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { - for { - select { - case tmpMsg, ok := <-pc.subQueue: - if ok { + // Send flow request after 1/2 of the queue + // has been consumed + highWater := uint32(cap(pc.subQueue)) / 2 + + drain := func() { + for { + select { + case tmpMsg := <-pc.subQueue: msgs <- tmpMsg + default: + return + } + } + } + +CONSUMER: + for { + // ensure that the message queue is empty + drain() + + // request half the buffer's capacity + if err := pc.internalFlow(highWater); err != nil { + continue CONSUMER + } - err := pc.messageProcessed(tmpMsg.ID()) - if err != nil { - return err + for { + select { + case tmpMsg, ok := <-pc.subQueue: + if ok { + msgs <- tmpMsg + + err := pc.messageProcessed(tmpMsg.ID()) + if err != nil { + fmt.Println("hello error") + continue CONSUMER + //return err + } + fmt.Printf("In receiveAsync, big chan size:%d\n", len(msgs)) Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329889635 ## File path: pulsar/impl_partition_consumer.go ## @@ -22,6 +22,7 @@ import ( "fmt" "math" "sync" + `sync/atomic` Review comment: different mark of '`' , maybe use "\"" is better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890290 ## File path: pulsar/impl_partition_consumer.go ## @@ -650,27 +682,31 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header Consumer: pc, } + pc.log.Debugf("receive message form broker, payload is:%s", string(payload)) + + fmt.Printf("From broker receive msg: %s , consumerID:{%d}\n", string(payload), pc.consumerID) select { case pc.subQueue <- consumerMsg: - //Add messageId to redeliverMessages buffer, avoiding duplicates. - newMid := response.GetMessageId() - var dup bool - - pc.omu.Lock() - for _, mid := range pc.redeliverMessages { - if proto.Equal(mid, newMid) { - dup = true - break - } - } - - if !dup { - pc.redeliverMessages = append(pc.redeliverMessages, newMid) - } - pc.omu.Unlock() - continue - default: - return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel)) + fmt.Printf("sub queue size is: %d, consumerID: {%d}\n", len(pc.subQueue), pc.consumerID) Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329889857 ## File path: pulsar/impl_partition_consumer.go ## @@ -265,21 +264,21 @@ func (pc *partitionConsumer) trackMessage(msgID MessageID) error { } func (pc *partitionConsumer) increaseAvailablePermits() error { - pc.receivedSinceFlow++ - highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1)) - + atomic.AddUint32(&pc.receivedSinceFlow, 1) + highWater := uint32(math.Max(float64(cap(pc.subQueue)/2), 1)) pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater) - // send flow request after 1/2 of the queue has been consumed if pc.receivedSinceFlow >= highWater { pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow) - err := pc.internalFlow(pc.receivedSinceFlow) + err := pc.internalFlow(highWater) if err != nil { pc.log.Errorf("Send flow cmd error:%s", err.Error()) - pc.receivedSinceFlow = 0 + atomic.SwapUint32(&pc.receivedSinceFlow, 0) + //pc.receivedSinceFlow = 0 Review comment: remove extra code better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890335 ## File path: pulsar/impl_partition_consumer.go ## @@ -650,27 +682,31 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header Consumer: pc, } + pc.log.Debugf("receive message form broker, payload is:%s", string(payload)) + + fmt.Printf("From broker receive msg: %s , consumerID:{%d}\n", string(payload), pc.consumerID) select { case pc.subQueue <- consumerMsg: - //Add messageId to redeliverMessages buffer, avoiding duplicates. - newMid := response.GetMessageId() - var dup bool - - pc.omu.Lock() - for _, mid := range pc.redeliverMessages { - if proto.Equal(mid, newMid) { - dup = true - break - } - } - - if !dup { - pc.redeliverMessages = append(pc.redeliverMessages, newMid) - } - pc.omu.Unlock() - continue - default: - return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel)) + fmt.Printf("sub queue size is: %d, consumerID: {%d}\n", len(pc.subQueue), pc.consumerID) Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic
xujianhai666 commented on a change in pull request #61: [issue:60] Fix partition topic message router logic URL: https://github.com/apache/pulsar-client-go/pull/61#discussion_r329890151 ## File path: pulsar/impl_partition_consumer.go ## @@ -312,23 +311,53 @@ func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err } func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { - for { - select { - case tmpMsg, ok := <-pc.subQueue: - if ok { + // Send flow request after 1/2 of the queue + // has been consumed + highWater := uint32(cap(pc.subQueue)) / 2 + + drain := func() { + for { + select { + case tmpMsg := <-pc.subQueue: msgs <- tmpMsg + default: + return + } + } + } + +CONSUMER: + for { + // ensure that the message queue is empty + drain() + + // request half the buffer's capacity + if err := pc.internalFlow(highWater); err != nil { + continue CONSUMER + } - err := pc.messageProcessed(tmpMsg.ID()) - if err != nil { - return err + for { + select { + case tmpMsg, ok := <-pc.subQueue: + if ok { + msgs <- tmpMsg + + err := pc.messageProcessed(tmpMsg.ID()) + if err != nil { + fmt.Println("hello error") Review comment: remove fmt pls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 commented on a change in pull request #65: [Issue 64] fix bug: type assert cause panic
xujianhai666 commented on a change in pull request #65: [Issue 64] fix bug: type assert cause panic URL: https://github.com/apache/pulsar-client-go/pull/65#discussion_r329888718 ## File path: pulsar/impl_partition_producer.go ## @@ -293,14 +293,17 @@ func (p *partitionProducer) internalFlushCurrentBatch() { func (p *partitionProducer) internalFlush(fr *flushRequest) { p.internalFlushCurrentBatch() - pi := p.pendingQueue.PeekLast().(*pendingItem) Review comment: In point of my view, if type is specific, meaning the item from pendingQueue is *pendingItem, we can not type assert with ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 opened a new pull request #69: [ISSUE #68][feat]add Option for new producer (#68)
xujianhai666 opened a new pull request #69: [ISSUE #68][feat]add Option for new producer (#68) URL: https://github.com/apache/pulsar-client-go/pull/69 Change-Id: I85a9c9f20e61e126b617eab919d2405a3ebda087 Master Issue: #68 ### Motivation simple initialization of Producer ### Modifications add ProducerOption for newProducer of client ### Verifying this change - [ ] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (**yes** / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] massakam commented on issue #5288: [pulsar-broker] Close previous dispatcher when subscription type changes
massakam commented on issue #5288: [pulsar-broker] Close previous dispatcher when subscription type changes URL: https://github.com/apache/pulsar/pull/5288#issuecomment-536830065 rerun cpp tests rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] rdhabalia commented on issue #5298: [pulsar-broker] Fix: invalidate cache on zk-cache timeout
rdhabalia commented on issue #5298: [pulsar-broker] Fix: invalidate cache on zk-cache timeout URL: https://github.com/apache/pulsar/pull/5298#issuecomment-536821310 @merlimat i have added a test which simulates the exact scenario and fails without this change and it invalidates cache with the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 opened a new issue #68: Use ConfigOption withxxxx for simple usage
xujianhai666 opened a new issue #68: Use ConfigOption with for simple usage URL: https://github.com/apache/pulsar-client-go/issues/68 **Is your feature request related to a problem? Please describe.** now, when we use client 、producer、consumer, we must provide CleintOption ProducerOption ConsumerOption, but there are many params on Option, which puzzle users. according to Rob Pike: https://commandcenter.blogspot.com/2014/01/self-referential-functions-and-design.html and Dave cheney: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis , we should use WithXXX. for example: ``` type ClientOption func(opts ClientOptions) func WithURL(URL string) ClientOption { return func(opts ClientOptions) { . } } ``` **Describe the solution you'd like** A clear and concise description of what you want to happen. **Describe alternatives you've considered** A clear and concise description of any alternative solutions or features you've considered. **Additional context** Add any other context or screenshots about the feature request here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on issue #5292: [Doc] Add *Solr sink connector guide*
tuteng commented on issue #5292: [Doc] Add *Solr sink connector guide* URL: https://github.com/apache/pulsar/pull/5292#issuecomment-536807186 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar-client-go] xujianhai666 opened a new pull request #67: refactor: remove extra timeout param
xujianhai666 opened a new pull request #67: refactor: remove extra timeout param URL: https://github.com/apache/pulsar-client-go/pull/67 Change-Id: I9d5d544808a6f739d79776a4f154d7caceaa133c ### Motivation remove extra timeout param on UnackedMessageTracker#handlerCmd ### Modifications *Describe the modifications you've done.* ### Verifying this change ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] rdhabalia opened a new pull request #5298: [pulsar-broker] Fix: invalidate cache on zk-cache timeout
rdhabalia opened a new pull request #5298: [pulsar-broker] Fix: invalidate cache on zk-cache timeout URL: https://github.com/apache/pulsar/pull/5298 ### Motivation Right now, while creating producer/consumer, broker tries to authorize them by fetching namespace policies using zkCache. But sometimes, zkCache times-out while getting zk-node into cache and zk-cache keeps pending key for 5 mins. Because of that client can't create producer/consumer on that topic for 5 mins. Fix: broker should invalidate key if it receives timeout while accessing zk-cache so, subsequent requests can successfully fetch it ``` 01:07:14.847 [pulsar-ordered-OrderedExecutor-2-0] WARN org.apache.pulsar.broker.service.BrokerService - Got exception when reading persistence policy for persistent://prop/cluster/ns/topic: null java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) ~[?:?] at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:95) ~[pulsar-zookeeper-utils-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.pulsar.broker.service.BrokerService.lambda$getManagedLedgerConfig$119(BrokerService.java:723) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.util.SafeRun$2.safeRun(SafeRun.java:49) [managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.9.0.jar:4.9.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final] at java.lang.Thread.run(Thread.java:834) [?:?] 01:07 : : 01:07:14.848 [bookkeeper-ml-workers-OrderedExecutor-3-0] WARN org.apache.pulsar.broker.service.persistent.DispatchRateLimiter - Failed to get message-rate for persistent://prop/cluster/ns/topic java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) ~[?:?] at org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPolicies(DispatchRateLimiter.java:254) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.isDispatchRateNeeded(DispatchRateLimiter.java:155) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.pulsar.broker.service.persistent.PersistentTopic.initializeDispatchRateLimiterIfNeeded(PersistentTopic.java:248) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.pulsar.broker.service.persistent.PersistentTopic.(PersistentTopic.java:199) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.pulsar.broker.service.BrokerService$3.openLedgerComplete(BrokerService.java:656) ~[pulsar-broker-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$98(ManagedLedgerFactoryImpl.java:328) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo ] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:467) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:273) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:246) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$120(MetaStoreImplZookeeper.java:241) ~[managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-original-2.4.3-yahoo.jar:2.4.3-yahoo] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.9.0
[GitHub] [pulsar] merlimat commented on issue #5297: Don't attempt to append on read-only cursor ledger
merlimat commented on issue #5297: Don't attempt to append on read-only cursor ledger URL: https://github.com/apache/pulsar/pull/5297#issuecomment-536751955 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] jerrypeng commented on issue #5271: Improve error handling logic for effectively once
jerrypeng commented on issue #5271: Improve error handling logic for effectively once URL: https://github.com/apache/pulsar/pull/5271#issuecomment-536747819 rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] rdhabalia commented on issue #5276: Fixed race condition while triggering message redelivery after an ack-timeout event
rdhabalia commented on issue #5276: Fixed race condition while triggering message redelivery after an ack-timeout event URL: https://github.com/apache/pulsar/pull/5276#issuecomment-536719899 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] vzhikserg commented on issue #5205: [doc] Fix broken links to the info about schemas and Schema Registry
vzhikserg commented on issue #5205: [doc] Fix broken links to the info about schemas and Schema Registry URL: https://github.com/apache/pulsar/pull/5205#issuecomment-536692594 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] merlimat commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event
merlimat commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event URL: https://github.com/apache/pulsar/pull/5276#discussion_r329706992 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() { if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId); } -// redeliver unacked-msgs -subscription.redeliverUnacknowledgedMessages(this); -flowConsumerBlockedPermits(this); + if (pendingAcks != null) { -AtomicInteger totalRedeliveryMessages = new AtomicInteger(0); -pendingAcks.forEach( -(ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize)); -msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get()); -pendingAcks.clear(); +List pendingPositions = new ArrayList<>((int) pendingAcks.size()); +MutableInt totalRedeliveryMessages = new MutableInt(0); +pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { +totalRedeliveryMessages.add((int) batchSize); +pendingPositions.add(new PositionImpl(ledgerId, entryId)); +}); + +for (PositionImpl p : pendingPositions) { +pendingAcks.remove(p.getLedgerId(), p.getEntryId()); Review comment: It's to avoid missing any insertion that might be happening into pendingAcks from a different thread. (eg: if the dispatcher thread was actually sending messages at the same time) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] merlimat opened a new pull request #5297: Don't attempt to append on read-only cursor ledger
merlimat opened a new pull request #5297: Don't attempt to append on read-only cursor ledger URL: https://github.com/apache/pulsar/pull/5297 ### Motivation When the cursor is recovered from a ledger, the ledgerHandle is kept open so that we can delete that ledger after we update the cursor status. If we attempt to close the cursor before having had any updates on the cursor itself, we would end up having a harmless error, saying that we're trying to append to a read-only ledger. eg. ``` 00:29:32.964 [pulsar-ordered-OrderedExecutor-6-0-EventThread] ERROR org.apache.bookkeeper.client.ReadOnlyLedgerHandle - Tried to add entry on a Read-Only ledger handle, ledgerid=10 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] rdhabalia commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event
rdhabalia commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event URL: https://github.com/apache/pulsar/pull/5276#discussion_r329693205 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() { if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId); } -// redeliver unacked-msgs -subscription.redeliverUnacknowledgedMessages(this); -flowConsumerBlockedPermits(this); + if (pendingAcks != null) { -AtomicInteger totalRedeliveryMessages = new AtomicInteger(0); -pendingAcks.forEach( -(ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize)); -msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get()); -pendingAcks.clear(); +List pendingPositions = new ArrayList<>((int) pendingAcks.size()); +MutableInt totalRedeliveryMessages = new MutableInt(0); +pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { +totalRedeliveryMessages.add((int) batchSize); +pendingPositions.add(new PositionImpl(ledgerId, entryId)); +}); + +for (PositionImpl p : pendingPositions) { +pendingAcks.remove(p.getLedgerId(), p.getEntryId()); Review comment: `pendingPositions` will have same number of positions as `pendingAcks`.. so, why can't we just clear `pendingAcks` here and anyway, `subscription:: redeliverUnacknowledgedMessages` is happening later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] merlimat commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery*
merlimat commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery* URL: https://github.com/apache/pulsar/pull/5293#discussion_r329680805 ## File path: site2/docs/concepts-messaging.md ## @@ -411,4 +411,32 @@ Message deduplication makes Pulsar an ideal messaging system to be used in conju > More in-depth information can be found in [this > post](https://streaml.io/blog/pulsar-effectively-once/) on the [Streamlio > blog](https://streaml.io/blog) +## Delayed Message Delivery +Delayed Message Delivery enables you to consume new message later, instead of immediately as normal. In this mechanism, messages is stored in memoey after publish to broker, after specific delayed time passed, the message will be delivered to consumer. +The diagram below illustrates concept: + +![Delayed Message Delivery](assets/message_delay.png) + +broker save message without any check. when consumer consume message, if message is delayed type, message will add to DelayedDeliveryTracker. subscription will check and get timeout messages from DelayedDeliveryTracker. + +### broker +Delayed Message Delivery is enabled by default on broker conf as below +``` +# Whether to enable the delayed delivery for messages. +# If disabled, messages will be immediately delivered and there will +# be no tracking overhead. +delayedDeliveryEnabled=true + +# Control the tick time for when retrying on delayed delivery, +# affecting the accuracy of the delivery time compared to the scheduled time. +# Default is 1 second. +delayedDeliveryTickTimeMillis=1000 +``` + +### producer +Here's an example for Delayed message producer +```java +// message to be delivered at the configured delay interval +producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send(); +``` Review comment: Yes, at this point Python consumers can receive delayed messages, though we still need to expose the "delay" parameter when publishing a message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] merlimat commented on issue #5276: Fixed race condition while triggering message redelivery after an ack-timeout event
merlimat commented on issue #5276: Fixed race condition while triggering message redelivery after an ack-timeout event URL: https://github.com/apache/pulsar/pull/5276#issuecomment-536646537 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] merlimat commented on issue #5295: [Doc]to fix syntax err in the sample code: close producer async
merlimat commented on issue #5295: [Doc]to fix syntax err in the sample code: close producer async URL: https://github.com/apache/pulsar/pull/5295#issuecomment-536646113 run integration tests run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on issue #5288: [pulsar-broker] Close previous dispatcher when subscription type changes
codelipenghui commented on issue #5288: [pulsar-broker] Close previous dispatcher when subscription type changes URL: https://github.com/apache/pulsar/pull/5288#issuecomment-536610573 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Update io-cdc-canal.md (#5266)
This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new da04dbe Update io-cdc-canal.md (#5266) da04dbe is described below commit da04dbe3b28f7720b815c93130b8fbb420ca785c Author: tomix1337 AuthorDate: Mon Sep 30 16:31:20 2019 +0200 Update io-cdc-canal.md (#5266) * Update io-cdc-canal.md Fixed a couple of things: Pulsar 2.3.0 connectors gives a 404 error. pulsar-admin does not support a "sources" command, but a "source"/ Updated version 2.4.1 to proper version. --- site2/docs/io-cdc-canal.md | 4 ++-- site2/website/versioned_docs/version-2.4.1/io-cdc-canal.md | 8 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/site2/docs/io-cdc-canal.md b/site2/docs/io-cdc-canal.md index 9d371ae..97701c4 100644 --- a/site2/docs/io-cdc-canal.md +++ b/site2/docs/io-cdc-canal.md @@ -163,8 +163,8 @@ Here is an example of storing MySQL data using the configuration file as above. ```bash $ docker exec -it pulsar-standalone /bin/bash -$ wget http://apache.01link.hk/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors -$ ./bin/pulsar-admin sources localrun \ +$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors +$ ./bin/pulsar-admin source localrun \ --archive ./connectors/pulsar-io-canal-2.3.0.nar \ --classname org.apache.pulsar.io.canal.CanalStringSource \ --tenant public \ diff --git a/site2/website/versioned_docs/version-2.4.1/io-cdc-canal.md b/site2/website/versioned_docs/version-2.4.1/io-cdc-canal.md index f791f9a..221b325 100644 --- a/site2/website/versioned_docs/version-2.4.1/io-cdc-canal.md +++ b/site2/website/versioned_docs/version-2.4.1/io-cdc-canal.md @@ -99,8 +99,8 @@ docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinat - Start pulsar standalone ```$bash -docker pull apachepulsar/pulsar:2.3.0 -docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone +docker pull apachepulsar/pulsar:2.4.1 +docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.4.1 bin/pulsar standalone ``` - Start pulsar-io in standalone @@ -144,8 +144,8 @@ docker cp pulsar-client.py pulsar-standalone:/pulsar/ - Download canal connector and start canal connector ```$bash docker exec -it pulsar-standalone /bin/bash -wget http://apache.01link.hk/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors -./bin/pulsar-admin sources localrun --archive ./connectors/pulsar-io-canal-2.3.0.nar --classname org.apache.pulsar.io.canal.CanalStringSource --tenant public --namespace default --name canal --destination-topic-name my-topic --source-config-file /pulsar/conf/canal-mysql-source-config.yaml --parallelism 1 +wget http://apache.01link.hk/pulsar/pulsar-2.4.1/connectors/pulsar-io-canal-2.4.1.nar -P connectors +./bin/pulsar-admin sources localrun --archive ./connectors/pulsar-io-canal-2.4.1.nar --classname org.apache.pulsar.io.canal.CanalStringSource --tenant public --namespace default --name canal --destination-topic-name my-topic --source-config-file /pulsar/conf/canal-mysql-source-config.yaml --parallelism 1 ``` - Consumption data
[GitHub] [pulsar] jiazhai merged pull request #5266: Update io-cdc-canal.md
jiazhai merged pull request #5266: Update io-cdc-canal.md URL: https://github.com/apache/pulsar/pull/5266 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tomix1337 commented on issue #5266: Update io-cdc-canal.md
tomix1337 commented on issue #5266: Update io-cdc-canal.md URL: https://github.com/apache/pulsar/pull/5266#issuecomment-536583001 @tuteng can we merge? Everything should be ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#issuecomment-536547443 run cpp tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #5290: [Doc] Add *RabbitMQ sink connector guide*
Anonymitaet commented on issue #5290: [Doc] Add *RabbitMQ sink connector guide* URL: https://github.com/apache/pulsar/pull/5290#issuecomment-536547125 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#issuecomment-536547076 run cpp tests run integration tests run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet removed a comment on issue #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet removed a comment on issue #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#issuecomment-536546908 retest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on issue #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#issuecomment-536546908 retest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] yittg commented on issue #5165: [PIP-43] Support producer to send msg with different schema
yittg commented on issue #5165: [PIP-43] Support producer to send msg with different schema URL: https://github.com/apache/pulsar/pull/5165#issuecomment-536546873 @sijie @codelipenghui @congbobo184 May I take up some more of your time to PUSH this PR on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #5296: [Doc] Update connector guide overall
Anonymitaet commented on issue #5296: [Doc] Update connector guide overall URL: https://github.com/apache/pulsar/pull/5296#issuecomment-536545980 @tuteng could you please help review? Thank you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet edited a comment on issue #5015: [Doc] Update Pulsar Connector Guide
Anonymitaet edited a comment on issue #5015: [Doc] Update Pulsar Connector Guide URL: https://github.com/apache/pulsar/issues/5015#issuecomment-524149120 I've created the following PRs to resolve this issue: # Overview * https://github.com/apache/pulsar/pull/5183 * https://github.com/apache/pulsar/pull/5224 # Use * https://github.com/apache/pulsar/pull/5071 # Built-in connector Overall: https://github.com/apache/pulsar/issues/5015 Connectors come into two types: ## Source connector Source connector name|Configuration|Example |---|---|--- Kafka |https://github.com/apache/pulsar/pull/5203|https://github.com/apache/pulsar/pull/5203| * https://github.com/apache/pulsar/pull/5168 * https://github.com/apache/pulsar/pull/5173 * https://github.com/apache/pulsar/pull/5197 * https://github.com/apache/pulsar/pull/5199 * https://github.com/apache/pulsar/pull/5210 * https://github.com/apache/pulsar/pull/5211 * https://github.com/apache/pulsar/pull/5214 * https://github.com/apache/pulsar/pull/5226 ## Sink connector Sink connector name|Configuration|Example |---|---|--- Flume|https://github.com/apache/pulsar/pull/5238|| JDBC|https://github.com/apache/pulsar/pull/5275|| MongoDB|https://github.com/apache/pulsar/pull/5289|| RabbitMQ|https://github.com/apache/pulsar/pull/5290|| Redis|https://github.com/apache/pulsar/pull/5291|| Solr|https://github.com/apache/pulsar/pull/5292|| * Flume sink: * HDFS3 sink: https://github.com/apache/pulsar/pull/5239 * File source + **example**: https://github.com/apache/pulsar/pull/5240 * Twitter Firehose source: https://github.com/apache/pulsar/pull/5251 * Netty source: https://github.com/apache/pulsar/pull/5253 * Rabbit source: https://github.com/apache/pulsar/pull/5255 * Aerospike sink: https://github.com/apache/pulsar/pull/5256 * Cassandra sink: https://github.com/apache/pulsar/pull/5257 * ElasticSearch sink: https://github.com/apache/pulsar/pull/5258 * InfluxDB sink: https://github.com/apache/pulsar/pull/5273 # CDC connector * https://github.com/apache/pulsar/pull/5198 # Debug * https://github.com/apache/pulsar/pull/5041 # Connector Admin CLI * https://github.com/apache/pulsar/pull/5028 * https://github.com/apache/pulsar/pull/5184 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet opened a new pull request #5296: [Doc] Update connector guide overall
Anonymitaet opened a new pull request #5296: [Doc] Update connector guide overall URL: https://github.com/apache/pulsar/pull/5296 Fix https://github.com/apache/pulsar/issues/5015 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery*
xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery* URL: https://github.com/apache/pulsar/pull/5293#discussion_r329529524 ## File path: site2/docs/concepts-messaging.md ## @@ -411,4 +411,32 @@ Message deduplication makes Pulsar an ideal messaging system to be used in conju > More in-depth information can be found in [this > post](https://streaml.io/blog/pulsar-effectively-once/) on the [Streamlio > blog](https://streaml.io/blog) +## Delayed Message Delivery +Delayed Message Delivery enables you to consume new message later, instead of immediately as normal. In this mechanism, messages is stored in memoey after publish to broker, after specific delayed time passed, the message will be delivered to consumer. +The diagram below illustrates concept: + +![Delayed Message Delivery](assets/message_delay.png) + +broker save message without any check. when consumer consume message, if message is delayed type, message will add to DelayedDeliveryTracker. subscription will check and get timeout messages from DelayedDeliveryTracker. + +### broker +Delayed Message Delivery is enabled by default on broker conf as below +``` +# Whether to enable the delayed delivery for messages. +# If disabled, messages will be immediately delivered and there will +# be no tracking overhead. +delayedDeliveryEnabled=true + +# Control the tick time for when retrying on delayed delivery, +# affecting the accuracy of the delivery time compared to the scheduled time. +# Default is 1 second. +delayedDeliveryTickTimeMillis=1000 +``` + +### producer +Here's an example for Delayed message producer +```java +// message to be delivered at the configured delay interval +producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send(); +``` Review comment: @pouledodue there is no implementation on python client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery*
xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery* URL: https://github.com/apache/pulsar/pull/5293#discussion_r329529414 ## File path: site2/docs/concepts-messaging.md ## @@ -411,4 +411,32 @@ Message deduplication makes Pulsar an ideal messaging system to be used in conju > More in-depth information can be found in [this > post](https://streaml.io/blog/pulsar-effectively-once/) on the [Streamlio > blog](https://streaml.io/blog) +## Delayed Message Delivery +Delayed Message Delivery enables you to consume new message later, instead of immediately as normal. In this mechanism, messages is stored in memoey after publish to broker, after specific delayed time passed, the message will be delivered to consumer. +The diagram below illustrates concept: + +![Delayed Message Delivery](assets/message_delay.png) + +broker save message without any check. when consumer consume message, if message is delayed type, message will add to DelayedDeliveryTracker. subscription will check and get timeout messages from DelayedDeliveryTracker. + +### broker +Delayed Message Delivery is enabled by default on broker conf as below +``` +# Whether to enable the delayed delivery for messages. +# If disabled, messages will be immediately delivered and there will +# be no tracking overhead. +delayedDeliveryEnabled=true + +# Control the tick time for when retrying on delayed delivery, +# affecting the accuracy of the delivery time compared to the scheduled time. +# Default is 1 second. +delayedDeliveryTickTimeMillis=1000 +``` + +### producer +Here's an example for Delayed message producer +```java +// message to be delivered at the configured delay interval +producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send(); +``` Review comment: there is no implementation on python client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery*
xujianhai666 commented on a change in pull request #5293: [ISSUE #4675][docs] Add *Delayed Message Delivery* URL: https://github.com/apache/pulsar/pull/5293#discussion_r329526225 ## File path: site2/docs/concepts-messaging.md ## @@ -411,4 +411,32 @@ Message deduplication makes Pulsar an ideal messaging system to be used in conju > More in-depth information can be found in [this > post](https://streaml.io/blog/pulsar-effectively-once/) on the [Streamlio > blog](https://streaml.io/blog) +## Delayed Message Delivery +Delayed Message Delivery enables you to consume new message later, instead of immediately as normal. In this mechanism, messages is stored in memoey after publish to broker, after specific delayed time passed, the message will be delivered to consumer. Review comment: @pouledodue done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng edited a comment on issue #5294: Getting errors when creating JWT
tuteng edited a comment on issue #5294: Getting errors when creating JWT URL: https://github.com/apache/pulsar/issues/5294#issuecomment-536501144 Can you write more detailed information, such as the version of the pulsar, whether there are exceptions thrown, and if so, what are the exceptions? @Loghijiaha Note: `path` needs to be replaced by your real path This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on issue #5292: [Doc] Add *Solr sink connector guide*
tuteng commented on issue #5292: [Doc] Add *Solr sink connector guide* URL: https://github.com/apache/pulsar/pull/5292#issuecomment-536501819 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on issue #5291: [Doc] Add *Redis sink connector guide*
tuteng commented on issue #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#issuecomment-536501577 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on issue #5294: Getting errors when creating JWT
tuteng commented on issue #5294: Getting errors when creating JWT URL: https://github.com/apache/pulsar/issues/5294#issuecomment-536501144 Can you write more detailed information, such as the version of the pulsar, whether there are exceptions thrown, and if so, what are the exceptions? @Loghijiaha Note: path needs to be replaced by your real path This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] kevenYLi opened a new pull request #5295: [Doc]to fix syntax err in the sample code: close producer async
kevenYLi opened a new pull request #5295: [Doc]to fix syntax err in the sample code: close producer async URL: https://github.com/apache/pulsar/pull/5295 ### Motivation * to fix syntax err in the sample code: 'Close operations can also be asynchronous’, the error are as below: - Cannot resolve method 'exceptionally(\) - Bad return type in lambda expression: Throwable cannot be converted to Void ### Modifications - delete the unexpected semicolon - adjust return clause. ### Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on issue #5266: Update io-cdc-canal.md
tuteng commented on issue #5266: Update io-cdc-canal.md URL: https://github.com/apache/pulsar/pull/5266#issuecomment-536497548 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on a change in pull request #5292: [Doc] Add *Solr sink connector guide*
tuteng commented on a change in pull request #5292: [Doc] Add *Solr sink connector guide* URL: https://github.com/apache/pulsar/pull/5292#discussion_r329503738 ## File path: site2/docs/io-solr-sink.md ## @@ -0,0 +1,60 @@ +--- +id: io-solr-sink +title: Solr sink connector +sidebar_label: Solr sink connector +--- + +The Solr sink connector pulls messages from Pulsar topics +and persists the messages to Solr collections. + + + +## Configuration + +The configuration of the Solr sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `solrUrl` | String|true|" " (empty string) | Comma-separated zookeeper hosts with chroot used in the SolrCloud mode. **Example**`localhost:2181,localhost:2182/chroot` URL to connect to Solr used in standalone mode. **Example**`localhost:8983/solr` | +| `solrMode` | String|true|SolrCloud| The client mode when interacting with the Solr cluster. Below are the available options:Standalone SolrCloud| +| `solrCollection` |String|true| " " (empty string) | Solr collection name to which records need to be written. | +| `solrCommitWithinMs` |int| false|10 | The time within million seconds for Solr updating commits.| +| `username` |String|false| " " (empty string) | The username for basic authentication.**Note: `usename` is case-sensitive.** | +| `password` | String|false| " " (empty string) | The password for basic authentication. **Note: `password` is case-sensitive.** | + + + +### Example + +Before using the Solr sink connector, you need to create a configuration file through one of the following methods. + +* JSON + +```json +{ +"solrUrl": "localhost:2181,localhost:2182/chroot", +"solrMode": "SolrCloud", +"solrCollection": "techproducts", +"solrCommitWithinMs": "100", Review comment: This `solrCommitWithinMs` value seems to be an integer ``` "solrCommitWithinMs": 100, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#discussion_r329503631 ## File path: site2/docs/io-redis-sink.md ## @@ -0,0 +1,69 @@ +--- +id: io-redis-sink +title: Redis sink connector +sidebar_label: Redis sink connector +--- + +The Redis sink connector pulls messages from Pulsar topics +and persists the messages to a Redis database. + + + +## Configuration + +The configuration of the Redis sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `redisHosts` |String|true|" " (empty string) | A comma-separated list of Redis hosts to connect to. | +| `redisPassword` |String|true|" " (empty string) | The password used to connect to Redis. | +| `redisDatabase` | int|true|0 | The Redis database to connect to. | +| `clientMode` |String| false|Standalone | The client mode when interacting with Redis cluster. Below are the available options: StandaloneCluster | +| `autoReconnect` | boolean|false|true | Whether the Redis client automatically reconnect or not. | +| `requestQueue` | int|false|2147483647 | The maximum number of queued requests to Redis. | +| `tcpNoDelay` |boolean| false| false | Whether to enable TCP with no delay or not. | +| `keepAlive` | boolean|false | false |Whether to enable a keepalive to Redis or not. | +| `connectTimeout` |long| false|1 | The time to wait before timing out when connecting in milliseconds. | +| `operationTimeout` | long|false|1 | The time before an operation is marked as timed out in milliseconds . | +| `batchTimeMs` | int|false|1000 | The Redis operation time in milliseconds. | +| `batchSize` | int|false|1000 | The batch size of writing to Redis database. | Review comment: OK, I've updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#discussion_r329503528 ## File path: site2/docs/io-redis-sink.md ## @@ -0,0 +1,69 @@ +--- +id: io-redis-sink +title: Redis sink connector +sidebar_label: Redis sink connector +--- + +The Redis sink connector pulls messages from Pulsar topics +and persists the messages to a Redis database. + + + +## Configuration + +The configuration of the Redis sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `redisHosts` |String|true|" " (empty string) | A comma-separated list of Redis hosts to connect to. | +| `redisPassword` |String|true|" " (empty string) | The password used to connect to Redis. | Review comment: OK, I've udpated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide*
Anonymitaet commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#discussion_r329503528 ## File path: site2/docs/io-redis-sink.md ## @@ -0,0 +1,69 @@ +--- +id: io-redis-sink +title: Redis sink connector +sidebar_label: Redis sink connector +--- + +The Redis sink connector pulls messages from Pulsar topics +and persists the messages to a Redis database. + + + +## Configuration + +The configuration of the Redis sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `redisHosts` |String|true|" " (empty string) | A comma-separated list of Redis hosts to connect to. | +| `redisPassword` |String|true|" " (empty string) | The password used to connect to Redis. | Review comment: OK, I've updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide*
Anonymitaet commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide* URL: https://github.com/apache/pulsar/pull/5289#discussion_r329502787 ## File path: site2/docs/io-mongo-sink.md ## @@ -0,0 +1,51 @@ +--- +id: io-mongo-sink +title: MongoDB sink connector +sidebar_label: MongoDB sink connector +--- + +The MongoDB sink connector pulls messages from Pulsar topics +and persists the messages to collections. + +## Configuration + +The configuration of the MongoDB sink connector has the following properties. + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `mongoUri` | String| true| " " (empty string) | The MongoDB URI to which the connector connects. For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/). | +| `database` | String| true| " " (empty string)| The database name to which the collection belongs. | +| `collection` | String| true| " " (empty string)| The collection name to which the connector writes messages. | +| `batchSize` | int|false|DEFAULT_BATCH_SIZE | The batch size of writing messages to collections. | +| `batchTimeMs` |long|false|DEFAULT_BATCH_TIME_MS| The batch operation interval in milliseconds. | Review comment: OK, I've updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide*
Anonymitaet commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide* URL: https://github.com/apache/pulsar/pull/5289#discussion_r329502771 ## File path: site2/docs/io-mongo-sink.md ## @@ -0,0 +1,51 @@ +--- +id: io-mongo-sink +title: MongoDB sink connector +sidebar_label: MongoDB sink connector +--- + +The MongoDB sink connector pulls messages from Pulsar topics +and persists the messages to collections. + +## Configuration + +The configuration of the MongoDB sink connector has the following properties. + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `mongoUri` | String| true| " " (empty string) | The MongoDB URI to which the connector connects. For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/). | +| `database` | String| true| " " (empty string)| The database name to which the collection belongs. | +| `collection` | String| true| " " (empty string)| The collection name to which the connector writes messages. | +| `batchSize` | int|false|DEFAULT_BATCH_SIZE | The batch size of writing messages to collections. | +| `batchTimeMs` |long|false|DEFAULT_BATCH_TIME_MS| The batch operation interval in milliseconds. | + + +### Example + +Before using the Mongo sink connector, you need to create a configuration file through one of the following methods. + +* JSON + +```json +{ +"mongoUri": "mongodb://localhost", +"database": "pulsar", +"collection": "messages", +"batchSize": "2", +"batchTimeMs": "500" +} +``` Review comment: OK, I've updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide*
tuteng commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#discussion_r329502037 ## File path: site2/docs/io-redis-sink.md ## @@ -0,0 +1,69 @@ +--- +id: io-redis-sink +title: Redis sink connector +sidebar_label: Redis sink connector +--- + +The Redis sink connector pulls messages from Pulsar topics +and persists the messages to a Redis database. + + + +## Configuration + +The configuration of the Redis sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `redisHosts` |String|true|" " (empty string) | A comma-separated list of Redis hosts to connect to. | +| `redisPassword` |String|true|" " (empty string) | The password used to connect to Redis. | +| `redisDatabase` | int|true|0 | The Redis database to connect to. | +| `clientMode` |String| false|Standalone | The client mode when interacting with Redis cluster. Below are the available options: StandaloneCluster | +| `autoReconnect` | boolean|false|true | Whether the Redis client automatically reconnect or not. | +| `requestQueue` | int|false|2147483647 | The maximum number of queued requests to Redis. | +| `tcpNoDelay` |boolean| false| false | Whether to enable TCP with no delay or not. | +| `keepAlive` | boolean|false | false |Whether to enable a keepalive to Redis or not. | +| `connectTimeout` |long| false|1 | The time to wait before timing out when connecting in milliseconds. | +| `operationTimeout` | long|false|1 | The time before an operation is marked as timed out in milliseconds . | +| `batchTimeMs` | int|false|1000 | The Redis operation time in milliseconds. | +| `batchSize` | int|false|1000 | The batch size of writing to Redis database. | Review comment: The default value for field `batchSize` is 200 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide*
tuteng commented on a change in pull request #5291: [Doc] Add *Redis sink connector guide* URL: https://github.com/apache/pulsar/pull/5291#discussion_r329502060 ## File path: site2/docs/io-redis-sink.md ## @@ -0,0 +1,69 @@ +--- +id: io-redis-sink +title: Redis sink connector +sidebar_label: Redis sink connector +--- + +The Redis sink connector pulls messages from Pulsar topics +and persists the messages to a Redis database. + + + +## Configuration + +The configuration of the Redis sink connector has the following properties. + + + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `redisHosts` |String|true|" " (empty string) | A comma-separated list of Redis hosts to connect to. | +| `redisPassword` |String|true|" " (empty string) | The password used to connect to Redis. | Review comment: Field `redisPassword` is optional. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide*
tuteng commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide* URL: https://github.com/apache/pulsar/pull/5289#discussion_r329497951 ## File path: site2/docs/io-mongo-sink.md ## @@ -0,0 +1,51 @@ +--- +id: io-mongo-sink +title: MongoDB sink connector +sidebar_label: MongoDB sink connector +--- + +The MongoDB sink connector pulls messages from Pulsar topics +and persists the messages to collections. + +## Configuration + +The configuration of the MongoDB sink connector has the following properties. + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `mongoUri` | String| true| " " (empty string) | The MongoDB URI to which the connector connects. For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/). | +| `database` | String| true| " " (empty string)| The database name to which the collection belongs. | +| `collection` | String| true| " " (empty string)| The collection name to which the connector writes messages. | +| `batchSize` | int|false|DEFAULT_BATCH_SIZE | The batch size of writing messages to collections. | +| `batchTimeMs` |long|false|DEFAULT_BATCH_TIME_MS| The batch operation interval in milliseconds. | Review comment: Using `100` instead of DEFAULT_BATCH_SIZE looks clearer Using `1000` instead of DEFAULT_BATCH_TIME_MS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] tuteng commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide*
tuteng commented on a change in pull request #5289: [Doc] Add *MongoDB sink connector guide* URL: https://github.com/apache/pulsar/pull/5289#discussion_r329497914 ## File path: site2/docs/io-mongo-sink.md ## @@ -0,0 +1,51 @@ +--- +id: io-mongo-sink +title: MongoDB sink connector +sidebar_label: MongoDB sink connector +--- + +The MongoDB sink connector pulls messages from Pulsar topics +and persists the messages to collections. + +## Configuration + +The configuration of the MongoDB sink connector has the following properties. + +### Property + +| Name | Type|Required | Default | Description +|--|--|--|-|-| +| `mongoUri` | String| true| " " (empty string) | The MongoDB URI to which the connector connects. For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/). | +| `database` | String| true| " " (empty string)| The database name to which the collection belongs. | +| `collection` | String| true| " " (empty string)| The collection name to which the connector writes messages. | +| `batchSize` | int|false|DEFAULT_BATCH_SIZE | The batch size of writing messages to collections. | +| `batchTimeMs` |long|false|DEFAULT_BATCH_TIME_MS| The batch operation interval in milliseconds. | + + +### Example + +Before using the Mongo sink connector, you need to create a configuration file through one of the following methods. + +* JSON + +```json +{ +"mongoUri": "mongodb://localhost", +"database": "pulsar", +"collection": "messages", +"batchSize": "2", +"batchTimeMs": "500" +} +``` Review comment: It is best to add a port number to mongo's link address, as shown below: ``` "mongoUri": "mongodb://localhost:27017", ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Loghijiaha opened a new issue #5294: Getting errors when creating JWT
Loghijiaha opened a new issue #5294: Getting errors when creating JWT URL: https://github.com/apache/pulsar/issues/5294 **Describe the bug** I created private/public key pairs using : `$ bin/pulsar tokens create-key-pair --output-private-key my-private.key --output-public-key my-public.key` And I couldn't create a token using the secretkey 👎 `bin/pulsar tokens create --private-key file:///path/to/my-private.key \ --subject test-user` **To Reproduce** 1. `$ bin/pulsar tokens create-key-pair --output-private-key my-private.key --output-public-key my-public.key` 2.`bin/pulsar tokens create --private-key file:///path/to/my-private.key \ --subject test-user` **Desktop (please complete the following information):** - OS: Ubuntu 18.04 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services