[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20804: [improve][offload] Extend the offload police to allow specifying more conf
Technoboy- commented on code in PR #20804: URL: https://github.com/apache/pulsar/pull/20804#discussion_r1263325619 ## pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java: ## @@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws IOException { } } +@Test +public void testCreateOffloadPoliciesWithExtraConfiguration() { +Properties properties = new Properties(); +properties.put("managedLedgerOffloadExtraConfigKey1", "value1"); +properties.put("managedLedgerOffloadExtraConfigKey2", "value2"); +OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties); Review Comment: ``` Properties properties = new Properties(); Properties extraProps = new Properties(); extraProps.put("configKey1", "value1"); extraProps.put("configKey2", "value2"); properties.put("managedLedgerOffloadExtraConfig", extraProps); OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties); ``` Seems better then `prefix` keys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lifepuzzlefun commented on pull request #20800: [improve][broker] Add the MessageExpirer interface to make code clear
lifepuzzlefun commented on PR #20800: URL: https://github.com/apache/pulsar/pull/20800#issuecomment-1635278642 @BewareMyPower 你好,我来澄清一下 你说到的代码问题,这个pr https://github.com/apache/pulsar/pull/20597 hard to read的原因是因为这里有一个bug,这个patch之前的代码是不允许用户调用admin api来重置offset的,不是一个『the unrelated changes』而且这里的代码风格是 从first init comment 提交的时候带过来的。 https://github.com/apache/pulsar/assets/13600283/54890ae6-2858-45ac-bab9-691ed8d700b7;> 这个pr很棒,而且我觉得提高了代码的可读性,但是我觉得很容易对代码的reviewer产生对我本人后续patch的误会。maybe you can just tell people this is a refactoring pr but not with some insult words ok ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode closed pull request #20805: [improve] [pip] PIP-288: Add a internal API waitForAllTopicsCreated under persistent
poorbarcode closed pull request #20805: [improve] [pip] PIP-288: Add a internal API waitForAllTopicsCreated under persistent URL: https://github.com/apache/pulsar/pull/20805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix][doc]Correcting spelling mistakes in the pulsar-broker module (#20798)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2152870702b [fix][doc]Correcting spelling mistakes in the pulsar-broker module (#20798) 2152870702b is described below commit 2152870702bec5b91a3d7125aa539c4f8e71c943 Author: wangda <38549158+da...@users.noreply.github.com> AuthorDate: Fri Jul 14 11:47:54 2023 +0800 [fix][doc]Correcting spelling mistakes in the pulsar-broker module (#20798) Signed-off-by: zhangwd3 --- .../broker/authentication/AuthenticationProviderAthenzTest.java | 2 +- .../broker/loadbalance/extensions/scheduler/TransferShedder.java| 2 +- .../java/org/apache/pulsar/broker/service/plugin/EntryFilter.java | 2 +- .../apache/pulsar/broker/transaction/buffer/TransactionBuffer.java | 2 +- .../buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +- .../src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 2 +- .../java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java| 2 +- .../java/org/apache/pulsar/broker/service/BrokerServiceTest.java| 2 +- .../org/apache/pulsar/broker/service/PersistentTopicE2ETest.java| 2 +- .../java/org/apache/pulsar/broker/service/ResendRequestTest.java| 2 +- .../pulsar/broker/service/persistent/DelayedDeliveryTest.java | 2 +- .../org/apache/pulsar/broker/testcontext/PulsarTestContext.java | 2 +- .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java| 4 ++-- .../org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java| 6 +++--- .../org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java | 2 +- .../apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java| 2 +- .../test/java/org/apache/pulsar/client/impl/PulsarTestClient.java | 2 +- .../pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java | 2 +- .../org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java | 2 +- .../test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java | 2 +- 20 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java index 89ee5ca0830..a5211c2f814 100644 --- a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java +++ b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java @@ -81,7 +81,7 @@ public class AuthenticationProviderAthenzTest { sysPropProvider2.initialize(config); assertEquals(sysPropProvider2.getAllowedOffset(), 0); } catch (Exception e) { -fail("Failed to get allowd offset from system property"); +fail("Failed to get allowed offset from system property"); } System.setProperty("pulsar.athenz.role.token_allowed_offset", "invalid"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 7bb16bac124..cec052b65ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory; /** * Load shedding strategy that unloads bundles from the highest loaded brokers. - * This strategy is only configurable in the broker load balancer extenstions introduced by + * This strategy is only configurable in the broker load balancer extensions introduced by * PIP-192[https://github.com/apache/pulsar/issues/16691]. * * This load shedding strategy has the following goals: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java index 2e5a590fa19..686c72df8c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java @@ -49,7 +49,7 @@ public interface EntryFilter { */ REJECT, /** - * postpone message, it should not go to this conmumer. + * postpone message, it should not go to this consumer. */ RESCHEDULE } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
[GitHub] [pulsar] Technoboy- merged pull request #20798: [fix][doc]Correcting spelling mistakes in the pulsar-broker module
Technoboy- merged PR #20798: URL: https://github.com/apache/pulsar/pull/20798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #20798: [fix][doc]Correcting spelling mistakes in the pulsar-broker module
codecov-commenter commented on PR #20798: URL: https://github.com/apache/pulsar/pull/20798#issuecomment-1635220217 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report > Merging [#20798](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) (ff51d5e) into [master](https://app.codecov.io/gh/apache/pulsar/commit/4e2ba4b9d2aff24add3016c48fe72457518444aa?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) (4e2ba4b) will **increase** coverage by `0.01%`. > The diff coverage is `50.00%`. [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20798/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #20798 +/- ## + Coverage 73.10% 73.12% +0.01% - Complexity3210832131 +23 Files 1866 1867 +1 Lines139028 139058 +30 Branches 1529415299 +5 + Hits 101634 101683 +49 + Misses2933329306 -27 - Partials 8061 8069 +8 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.08% <30.00%> (-0.10%)` | :arrow_down: | | systests | `25.05% <30.00%> (+0.03%)` | :arrow_up: | | unittests | `72.41% <50.00%> (+0.03%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...dbalance/extensions/scheduler/TransferShedder.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9leHRlbnNpb25zL3NjaGVkdWxlci9UcmFuc2ZlclNoZWRkZXIuamF2YQ==) | `82.92% <ø> (ø)` | | | [...ache/pulsar/broker/service/plugin/EntryFilter.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BsdWdpbi9FbnRyeUZpbHRlci5qYXZh) | `100.00% <ø> (ø)` | | | [...r/impl/SnapshotSegmentAbortedTxnProcessorImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9TbmFwc2hvdFNlZ21lbnRBYm9ydGVkVHhuUHJvY2Vzc29ySW1wbC5qYXZh) | `77.20% <ø> (ø)` | | | [...lsar/common/policies/data/OffloadPoliciesImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9wb2xpY2llcy9kYXRhL09mZmxvYWRQb2xpY2llc0ltcGwuamF2YQ==) | `83.43% <37.50%> (-1.28%)` | :arrow_down: | | [...va/org/apache/pulsar/broker/service/ServerCnx.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh) | `71.27% <100.00%> (-0.12%)` | :arrow_down: | | [.../persistent/ReplicatedSubscriptionsController.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUmVwbGljYXRlZFN1YnNjcmlwdGlvbnNDb250cm9sbGVyLmphdmE=) | `69.62% <100.00%> (-1.91%)` | :arrow_down: | ... and [86 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20798/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git
[GitHub] [pulsar] yyj8 commented on pull request #20048: [improve][broker]PIP-255 Add topic metrics for the number of production data requests to add a topic and the average number of messages per req
yyj8 commented on PR #20048: URL: https://github.com/apache/pulsar/pull/20048#issuecomment-1635207236 > > > Did this PIP vote pass? Where is the issue? > > > PIP-255 is `Make the partition assignment strategy pluggable` #19806 > > > > > > PIP must have an issue, right? > > pip needs to be discussed in the dev mail list, and finally initiate a vote, at least 2 PMC +1 can pass Do I still need to send an email to the development mailing list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
Gleiphir2769 commented on PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#issuecomment-1635174466 Please rerun workflow for this pr. cc @RobertIndie -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
Gleiphir2769 commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202 ## pulsar/producer_partition.go: ## @@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } - if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + runCallback(sr.callback, nil, sr.msg, errContextExpired) + return false + } + } + p.metrics.MessagesPending.Inc() + p.metrics.BytesPending.Add(float64(len(sr.msg.Payload))) Review Comment: Thanks for your careful review. L1425-L1426 is the original code and +1 to move it to `canReserveMem`. https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
Gleiphir2769 commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202 ## pulsar/producer_partition.go: ## @@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } - if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + runCallback(sr.callback, nil, sr.msg, errContextExpired) + return false + } + } + p.metrics.MessagesPending.Inc() + p.metrics.BytesPending.Add(float64(len(sr.msg.Payload))) Review Comment: Thanks for your careful review. L1425-L1426 is the original code and +1 for me move it to `canReserveMem`. https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
Gleiphir2769 commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202 ## pulsar/producer_partition.go: ## @@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } - if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + runCallback(sr.callback, nil, sr.msg, errContextExpired) + return false + } + } + p.metrics.MessagesPending.Inc() + p.metrics.BytesPending.Add(float64(len(sr.msg.Payload))) Review Comment: Thanks for your careful review. L1425-L1426 is the original code and I think we can move it to `canReserveMem` to fix this metric. https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
Gleiphir2769 commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263217545 ## pulsar/producer_test.go: ## @@ -1924,6 +1924,159 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.NoError(t, err) } +func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MemoryLimitBytes: 100 * 6, + }) + assert.NoError(t, err) + defer c.Close() + + schema := NewAvroSchema(`{"fields": + [ + {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]} + ], + "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, nil) + + topicName := newTopicName() + producer1, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + producer2, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + // the size of encoded value is 6 bytes + value := map[string]interface{}{ + "id": 0, + "name": map[string]interface{}{ + "string": "abc", + }, + } + + n := 101 + for i := 0; i < n/2; i++ { + producer1.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + + producer2.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + } + // Last message in order to reach the limit + producer1.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + _, err = producer2.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + // flush pending msg + err = producer1.Flush() + assert.NoError(t, err) + err = producer2.Flush() + assert.NoError(t, err) + assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.NoError(t, err) + _, err = producer2.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.NoError(t, err) +} + +func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MemoryLimitBytes: 10 * 1024, + }) + assert.NoError(t, err) + defer c.Close() + + topicName := newTopicName() + producer1, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: true, + EnableChunking: true, + ChunkMaxMessageSize: 1024, + SendTimeout: 2 * time.Second, + }) + + producer1.SendAsync(context.Background(), { + Payload: make([]byte, 10*1024+1), + }, func(id MessageID, message *ProducerMessage, e error) { + if e != nil { + t.Fatal(e) + } + }) + + _, err = producer1.Send(context.Background(), { + Payload: make([]byte, 1), + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + // wait all the chunks have been released Review Comment: Because of `DisableBatching=true`, `producer.flush` here is useless and cause panic. -- This is an automated message from the
[GitHub] [pulsar] github-actions[bot] commented on issue #19239: PIP-242: Topic name restrictions
github-actions[bot] commented on issue #19239: URL: https://github.com/apache/pulsar/issues/19239#issuecomment-1635164603 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19403: Add a new admin API getAllLoadReport()
github-actions[bot] commented on issue #19403: URL: https://github.com/apache/pulsar/issues/19403#issuecomment-1635164423 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19504: [Doc] Clarification of documentation regarding multi-tenancy
github-actions[bot] commented on issue #19504: URL: https://github.com/apache/pulsar/issues/19504#issuecomment-1635164273 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on pull request #19515: [feat][sql] Support querying compacted data in Pulsar SQL
github-actions[bot] commented on PR #19515: URL: https://github.com/apache/pulsar/pull/19515#issuecomment-1635164239 The pr had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19542: [Bug] Swagger definitions for creating sinks/sources are incorrect
github-actions[bot] commented on issue #19542: URL: https://github.com/apache/pulsar/issues/19542#issuecomment-1635164194 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19554: Add pulsar_tenant and pulsar_namespace labels to prometheus metrics
github-actions[bot] commented on issue #19554: URL: https://github.com/apache/pulsar/issues/19554#issuecomment-1635164136 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on pull request #19821: [wip][feat][io] Debezium DB2 source connector for Pulsar
github-actions[bot] commented on PR #19821: URL: https://github.com/apache/pulsar/pull/19821#issuecomment-1635163767 The pr had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r62967 [1/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/
Author: xiangying Date: Fri Jul 14 02:01:04 2023 New Revision: 62967 Log: Staging artifacts and signature for Pulsar release 2.10.5 Added: dev/pulsar/pulsar-2.10.5-candidate-1/ dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz (with props) dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz.asc dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz (with props) dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz.asc dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz (with props) dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz.asc dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/LICENSE dev/pulsar/pulsar-2.10.5-candidate-1/connectors/README dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/LICENSE dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/README dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar.asc dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar.sha512 dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-oracle-2.10.5.nar (with props) dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-oracle-2.10.5.nar.asc
svn commit: r62967 [2/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/
Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512 == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512 (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512 Fri Jul 14 02:01:04 2023 @@ -0,0 +1 @@ +5647705ad1d9db834f1348fe911615f320e47e78228eba2de7d7f6cc6bd452b6b5782633a3680d689f1ab207f52aa6c5f8fe9b76f393905e1879156035da6725 ./connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc Fri Jul 14 02:01:04 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo7oACgkQAvkjT8Mx +ZvcmKhAAiNiX1N9FmxECKmByJg7bmrYWszIoLQWYp9x2aTO8LPTYXhISjEYdPIVg +ZVuibKk3fEUT9ewHTN1oH8zZq+kHJHftLxbct4GvX1rcKcJnTnk0XABsVlSbYZUW +Q6cdez0WEP82bNgxkA0kBdIX5s55JG6+kwq44n+++z9m6IDz6Oixp2R3MVuW95LN +q+fy6uTf6PQhXZQe3StV4A0t3Sormabfyd3xDgoTv348CptVooFtGMtfOYmNZoC9 +AiCDZEjoNgzeXPEx7kS6t9Bt4FgYJyXbtfV2gIkgn+jzODDK/uGld+8at6stueuR +fgkHTXl605HgeL4+ET/S4fw1IA0VUDVtYgrdDkE3nq2ExdTOPhuSRntm4JLfxToi +lkI0rTICsYzxs6j4v1nsQ8+9EJKyTqKmQ5Sr0R74fb85EMvlzbqzIc8MYlK50pFk +coC6ERL1aosBMwIR6jnEXPRC+ekBARqnox4fyF2HDuRKUlyp8fWUSXR+G6oJRa5T +kx6qt1ae5nD55jBNjzTic8IcRCmhJ2cdHYtwQcOxWUuYdqQCroU3wTiVMGybe8ml +GUMNnlG51MKXbN+NFtsQsUz9CwrTVnfB8qYe33A6MTbt6UI7+Yw1Q0fEN98fhIV4 +PvVuPFhFzHu+u+A57OnLTtkN+fAhSHFCIZY9iy3YFxgpWpU4RIg= +=s6oI +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512 == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512 (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512 Fri Jul 14 02:01:04 2023 @@ -0,0 +1 @@ +5f2f8a1105be12a1cdef76fcd7532ab43d39d38808c51d40fc4beaa7074d91b4054b2d4b211d53594d9eb242d83083288ba7b1061e35a05a2367cad1a4f5b581 ./connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc Fri Jul 14 02:01:04 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo70ACgkQAvkjT8Mx +ZvdFpw//fpHiNH7swkjpKVOwIN9JQwJzP+792m9nExHkTPYKf6N3y3KFaXRBVqRg +oIaxu4GnKPPJzr6Vl/oIjiZ0Lv2NhseMkTZ9vph4jQlkMvypzkDCtC8Ktyz+paQ6 +lAKvsMnldQdm0eHXYv9zrxFBUMXCb09lnXufR0KG+NJtVJEKZHq99/TZTTyznHhO +PodVXL+gNUXufoVZ2whkIILHynBNoaWlzMD58vllhzc9goJYm92n44N5HoEiG9m0 +xbbB1ElINXiSBDmYLPNlNqY44PESZ6dKqMvXjgVO5hw6kxE8TlqhrqWKJrH2ipUh +yboomiXFTs7kDgrNccXELx6E2226UFYV89IDgpkm8AAEbqgrQ0sgTgK5Tyqt2RFb +8vxwgpdV2AyoYGCjh7OEJytJIBvmeRb+UVjstCsOPAUrUMxNTEpttahRMLyW6/uY
svn commit: r62967 [3/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/
Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc Fri Jul 14 02:01:04 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo9MACgkQAvkjT8Mx +ZveqyBAAoIlkpdDJbizCLNZccrHTM+2t1O2vkirDMmdZ0eTEUrUMeFoWU5q3AokG +G+YllJmtBJGkI7e/lGUvq82pBF+r4qWa0j4nk46MmkmZNTGOLv1EySuDe9s8h2Ak +AznK53mPuFCUSB9XOZmxYOQ/0e6/h0QP7NIZdIB8Wd0h4ACUZL1hAzzvji8pFDhD ++BQmgDAmjWK/lwYSZk4hQ0jRRfSlb64IP6lRmWSINvrCBBO9x90EyK3jrwcnPsxu +5bTlNtgiyDgLPi6NqXec8L4yj5pGsYoQ+zPZXV8NPkwYXuu4y5nbhIHIcP71w7rz +zpO1vEUFN0mQgIhg/IOh6rqfV376q4lPkscWRaKbxShbRMAWRO54j0vCCUBpirVo +O9rGUNDEOJHXjLyMLa1gDHio0t0DtGO9Zp1qBVAbjaClOpbIgaBg+M0RN4dM+JEB +7VOuiTsh00ZpDfQEuaqjfvr4Li4A2E0bcNX8G85gaiWoFA14x886cESbtPgti3lp +wU1TIq3AWmgd/q/3hoOdRTKCdPberaqAEePwQIXhXAjf6phPesoszngqgVvVNKKS +qGj+IAMB4PJOsmzZvBcOgLwHnxE2szEGNIhRah7LMFnSYOZVl3iO2uHZIy6ttrAB +5+TqWWKcU2bYAGA6p+aRDj8ZuyB1Tej7hal80TGKZ50czrZ63SI= +=aBrJ +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512 == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512 (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512 Fri Jul 14 02:01:04 2023 @@ -0,0 +1 @@ +2ea871f6ac5e12ebee09cee88ea157b1be3584144f5b4c784bf41e88acbc1094f839f34265328a3813cd31b341cdaff3bc5d529ace41af0482ec96661a599e7e ./connectors/pulsar-io-hbase-2.10.5.nar Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc Fri Jul 14 02:01:04 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo9cACgkQAvkjT8Mx +ZvcoYg/+LlcicvhiwivNqOtMeWb9ltXibi3nJUVRhL5BM5tzPIrp5RrlMCmlw1Mv +a2tEu+rWLaZcqT4sRVQBKn2AzMgtpTKZUduON8sdVy/5CHMedk97Z/yAV8HLgAFr +xEanAVRlpkRDbMqIf/cgtQsMqnAcqocvai5KcXmJB99mqLdXUlu/t7UDytRcmr4E +oBHUfcTYTZ1NQ8+ILPDYFHdpnqarLkfshkQxdkLS0Iz0CoWXo44a8QHo7V5wuKWf +3SfaYpGw1BXADvXuJHnxPIMbbOYCzgPecZFP4o5Vq+P0ULjbHz58vP9QyX/wMA0l +AqcVlFOTkX8abshk9mLY1bsl85wMa0cFqKjaTSfiTI6YuTs5hzRNhoOITxiv0yFq +N+aBRmbtAe1nelAV2DukpEHAoaP2oJvsubrU+KjuMVQAbWJh+rEXaLcBy71uNaPF +rceYhVMlT38iu99+/TYd6zqmSomxxhs7wtfRsE7nf7LjBjF88NexFN5GVwbmnIdP +SdeFJRyT8/FvWgAUpQlqkwXkNWMBVtyn4YyKdpAp3aroIhF6EJr9HOxClddDjJss +xW/R/Kw7qileIUx0zU3NWi7o9ix3nn25mjgl61kaKbPYTIxhgV6pKUu+CzxyJDc5 +BYsI4LO5VWhf20kd+gr/xQj4DcCmeyIpI+9QWhBP4IH1Lnc79UY= +=LRUA +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512 == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512 (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512 Fri Jul 14 02:01:04 2023 @@ -0,0 +1 @@ +3b3f4cfaf14433fc3004f091ff3c772739c80e1499d4b8c2f40eccd0fe05073bc63a1fbe006e95df33d9180d84da1dec865e88303100b1018093693b222eb7d5 ./connectors/pulsar-io-hdfs2-2.10.5.nar Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc == --- dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc (added) +++ dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc Fri Jul 14 02:01:04 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo84ACgkQAvkjT8Mx +ZveyJhAAhYSMMYcczrxhh2PGtYXMi42WEMUA671Xge7+t6uKxyRuYDxMXUWJnORU
[GitHub] [pulsar-site] shy-share opened a new pull request, #640: fix setReplicationClusters to replicationClusters
shy-share opened a new pull request, #640: URL: https://github.com/apache/pulsar-site/pull/640 This PR fixs apache/pulsar#20742 - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode opened a new pull request, #20805: [improve][pip] Add a internal API waitForAllTopicsCreated under persistent
poorbarcode opened a new pull request, #20805: URL: https://github.com/apache/pulsar/pull/20805 ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] zymap opened a new pull request, #20804: [improve][offload] Extend the offload police to allow specifying more conf
zymap opened a new pull request, #20804: URL: https://github.com/apache/pulsar/pull/20804 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation The offload policies have limited the configurations for the offloaders. That means if the offloader needs more configurations, we need to extend more fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it extendable easily. Add a configuration map support to allow it to set more configurations. ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] 02/02: [fix][test] Fix resource leak in PulsarTestContext (#20799)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit fbc92a281c36b88d97a82cd7a6a4a57a71a040f6 Author: Lari Hotari AuthorDate: Fri Jul 14 00:01:22 2023 +0300 [fix][test] Fix resource leak in PulsarTestContext (#20799) (cherry picked from commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd) --- .../org/apache/pulsar/broker/PulsarService.java| 6 +- .../pulsar/broker/service/PersistentTopicTest.java | 5 +- .../testcontext/AbstractTestPulsarService.java | 53 --- .../testcontext/NonStartableTestPulsarService.java | 79 -- 4 files changed, 49 insertions(+), 94 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index cf8ab33fd7e..6bb477cf037 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -787,7 +787,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { exposeTopicMetrics, offloaderScheduler, interval); this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies); -this.brokerInterceptor = BrokerInterceptors.load(config); +setBrokerInterceptor(newBrokerInterceptor()); // use getter to support mocking getBrokerInterceptor method in tests BrokerInterceptor interceptor = getBrokerInterceptor(); if (interceptor != null) { @@ -930,6 +930,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } +protected BrokerInterceptor newBrokerInterceptor() throws IOException { +return BrokerInterceptors.load(config); +} + @VisibleForTesting protected OrderedExecutor newOrderedExecutor() { return OrderedExecutor.newBuilder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 45ef58bb703..02b10dd09a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -131,6 +131,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; @@ -172,11 +173,13 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { svcConfig.setClusterName("pulsar-cluster"); svcConfig.setTopicLevelPoliciesEnabled(false); svcConfig.setSystemTopicEnabled(false); +Compactor compactor = mock(Compactor.class); +when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class)); pulsarTestContext = PulsarTestContext.builderForNonStartableContext() .config(svcConfig) .spyByDefault() .useTestPulsarResources(metadataStore) -.compactor(mock(Compactor.class)) +.compactor(compactor) .build(); brokerService = pulsarTestContext.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java index a6861268b94..517d57d0042 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.testcontext; +import java.io.IOException; import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -37,11 +38,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; */ abstract class AbstractTestPulsarService extends PulsarService { protected final SpyConfig spyConfig; -protected final MetadataStoreExtended localMetadataStore; -protected final MetadataStoreExtended configurationMetadataStore; -protected final Compactor compactor; -protected final BrokerInterceptor brokerInterceptor; -protected final BookKeeperClientFactory bookKeeperClientFactory; +private boolean compactorExists; public
[pulsar] 01/02: [fix][test] Fix flaky PersistentSubscriptionTest (#20434)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 7b259131c3ed41a6b1a28f85171d908f4120535a Author: Lari Hotari AuthorDate: Wed May 31 22:57:16 2023 +0300 [fix][test] Fix flaky PersistentSubscriptionTest (#20434) (cherry picked from commit 242758d5770de46e506855ff881472cbc274cedb) --- .../persistent/PersistentSubscriptionTest.java | 106 +++-- .../testcontext/NonStartableTestPulsarService.java | 17 2 files changed, 72 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 401f52daa62..87408598889 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -92,7 +92,12 @@ public class PersistentSubscriptionTest { public void setup() throws Exception { pulsarTestContext = PulsarTestContext.builderForNonStartableContext() .spyByDefault() -.configCustomizer(config -> config.setTransactionCoordinatorEnabled(true)) +.configCustomizer(config -> { +config.setTransactionCoordinatorEnabled(true); +config.setTransactionPendingAckStoreProviderClassName( + CustomTransactionPendingAckStoreProvider.class.getName()); + config.setTransactionBufferProviderClassName(InMemTransactionBufferProvider.class.getName()); +}) .useTestPulsarResources() .build(); @@ -100,56 +105,6 @@ public class PersistentSubscriptionTest { doReturn(Optional.of(new Policies())).when(namespaceResources) .getPoliciesIfCached(any()); -doReturn(new InMemTransactionBufferProvider()).when(pulsarTestContext.getPulsarService()) -.getTransactionBufferProvider(); -doReturn(new TransactionPendingAckStoreProvider() { -@Override -public CompletableFuture newPendingAckStore(PersistentSubscription subscription) { -return CompletableFuture.completedFuture(new PendingAckStore() { -@Override -public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { -try { -Field field = PendingAckHandleState.class.getDeclaredField("state"); -field.setAccessible(true); -field.set(pendingAckHandle, PendingAckHandleState.State.Ready); -} catch (NoSuchFieldException | IllegalAccessException e) { -fail(); -} -} - -@Override -public CompletableFuture closeAsync() { -return CompletableFuture.completedFuture(null); -} - -@Override -public CompletableFuture appendIndividualAck(TxnID txnID, List> positions) { -return CompletableFuture.completedFuture(null); -} - -@Override -public CompletableFuture appendCumulativeAck(TxnID txnID, PositionImpl position) { -return CompletableFuture.completedFuture(null); -} - -@Override -public CompletableFuture appendCommitMark(TxnID txnID, AckType ackType) { -return CompletableFuture.completedFuture(null); -} - -@Override -public CompletableFuture appendAbortMark(TxnID txnID, AckType ackType) { -return CompletableFuture.completedFuture(null); -} -}); -} - -@Override -public CompletableFuture checkInitializedBefore(PersistentSubscription subscription) { -return CompletableFuture.completedFuture(true); -} - }).when(pulsarTestContext.getPulsarService()).getTransactionPendingAckStoreProvider(); - ledgerMock = mock(ManagedLedgerImpl.class); cursorMock = mock(ManagedCursorImpl.class); managedLedgerConfigMock = mock(ManagedLedgerConfig.class); @@ -279,4 +234,53 @@ public class PersistentSubscriptionTest { // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); } + +public static class
[pulsar] branch branch-3.0 updated (d7e863748ab -> fbc92a281c3)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from d7e863748ab Release 3.0.1 new 7b259131c3e [fix][test] Fix flaky PersistentSubscriptionTest (#20434) new fbc92a281c3 [fix][test] Fix resource leak in PulsarTestContext (#20799) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/pulsar/broker/PulsarService.java| 6 +- .../pulsar/broker/service/PersistentTopicTest.java | 5 +- .../persistent/PersistentSubscriptionTest.java | 106 +++-- .../testcontext/AbstractTestPulsarService.java | 53 ++- .../testcontext/NonStartableTestPulsarService.java | 96 ++- 5 files changed, 121 insertions(+), 145 deletions(-)
[pulsar] branch master updated: [fix][test] Fix resource leak in PulsarTestContext (#20799)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ae0fc5bdcae [fix][test] Fix resource leak in PulsarTestContext (#20799) ae0fc5bdcae is described below commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd Author: Lari Hotari AuthorDate: Fri Jul 14 00:01:22 2023 +0300 [fix][test] Fix resource leak in PulsarTestContext (#20799) --- .../org/apache/pulsar/broker/PulsarService.java| 6 +- .../pulsar/broker/service/PersistentTopicTest.java | 5 +- .../testcontext/AbstractTestPulsarService.java | 53 --- .../testcontext/NonStartableTestPulsarService.java | 79 -- 4 files changed, 49 insertions(+), 94 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5cf3c47bcb0..40c5a2d6528 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -784,7 +784,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { exposeTopicMetrics, offloaderScheduler, interval); this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies); -this.brokerInterceptor = BrokerInterceptors.load(config); +setBrokerInterceptor(newBrokerInterceptor()); // use getter to support mocking getBrokerInterceptor method in tests BrokerInterceptor interceptor = getBrokerInterceptor(); if (interceptor != null) { @@ -927,6 +927,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } +protected BrokerInterceptor newBrokerInterceptor() throws IOException { +return BrokerInterceptors.load(config); +} + @VisibleForTesting protected OrderedExecutor newOrderedExecutor() { return OrderedExecutor.newBuilder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index fefed1aaa0a..c49df3e85ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -131,6 +131,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; @@ -172,11 +173,13 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { svcConfig.setClusterName("pulsar-cluster"); svcConfig.setTopicLevelPoliciesEnabled(false); svcConfig.setSystemTopicEnabled(false); +Compactor compactor = mock(Compactor.class); +when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class)); pulsarTestContext = PulsarTestContext.builderForNonStartableContext() .config(svcConfig) .spyByDefault() .useTestPulsarResources(metadataStore) -.compactor(mock(Compactor.class)) +.compactor(compactor) .build(); brokerService = pulsarTestContext.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java index a6861268b94..517d57d0042 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.testcontext; +import java.io.IOException; import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -37,11 +38,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; */ abstract class AbstractTestPulsarService extends PulsarService { protected final SpyConfig spyConfig; -protected final MetadataStoreExtended localMetadataStore; -protected final MetadataStoreExtended configurationMetadataStore; -protected final Compactor compactor; -protected final BrokerInterceptor brokerInterceptor; -protected final
[GitHub] [pulsar] lhotari closed issue #20797: [Tests] Thread resource leaks in unit tests
lhotari closed issue #20797: [Tests] Thread resource leaks in unit tests URL: https://github.com/apache/pulsar/issues/20797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari merged pull request #20799: [fix][test] Fix resource leak in PulsarTestContext
lhotari merged PR #20799: URL: https://github.com/apache/pulsar/pull/20799 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-helm-chart] vfauth commented on pull request #341: Respect rbac.limit_to_namespace for Functions RBAC
vfauth commented on PR #341: URL: https://github.com/apache/pulsar-helm-chart/pull/341#issuecomment-1634612612 @lhotari They fix the same issue, so you can close mine, thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on pull request #20803: Bump okio from 1.17.2 to 3.4.0
github-actions[bot] commented on PR #20803: URL: https://github.com/apache/pulsar/pull/20803#issuecomment-1634602123 @dependabot[bot] Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch dependabot/maven/com.squareup.okio-okio-3.4.0 created (now bf8bd627df5)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/com.squareup.okio-okio-3.4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git at bf8bd627df5 Bump okio from 1.17.2 to 3.4.0 No new revisions were added by this update.
[GitHub] [pulsar] dependabot[bot] opened a new pull request, #20803: Bump okio from 1.17.2 to 3.4.0
dependabot[bot] opened a new pull request, #20803: URL: https://github.com/apache/pulsar/pull/20803 Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0. Changelog Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md;>okio's changelog. Version 3.4.0 2023-07-07 New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) as an Okio FileSystem using fileSystem.asOkioFileSystem(). New: Adapt Android’s AssetManager as an Okio FileSystem using AssetFileSystem. This is in the new okio-assetfilesystem module. Android applications should prefer this over FileSystem.RESOURCES as it’s faster to load. Fix: Don't crash decoding GZIP files when the optional extra data (XLEN) is 32 KiB or larger. Fix: Resolve symlinks in FakeFileSystem.canonicalize(). Fix: Report the correct createdAtMillis in NodeJsFileSystem file metadata. We were incorrectly using ctimeMs, where c means changed, not created. Fix: UnsafeCursor is now Closeable. Version 3.3.0 2023-01-07 Fix: Don't leak resources when use {} is used with a non-local return. We introduced this performance and stability bug by not considering that non-local returns execute neither the return nor catch control flows. Fix: Use a sealed interface for BufferedSink and BufferedSource. These were never intended for end-users to implement, and we're happy that Kotlin now allows us to express that in our API. New: Change internal locks from synchronized to ReentrantLock and Condition. We expect this to improve help when using Okio with Java virtual threads ([Project Loom][loom]). Upgrade: [Kotlin 1.8.0][kotlin_1_8_0]. Version 3.2.0 2022-06-26 Fix: Configure the multiplatform artifact (com.squareup.okio:okio:3.x.x) to depend on the JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven builds. This should work-around an issue where Maven doesn't interpret Gradle metadata. Fix: Change CipherSource and CipherSink to recover if the cipher doesn't support streaming. This should work around a crash with AES/GCM ciphers on Android. New: Enable compatibility with non-hierarchical projects. Version 3.1.0 2022-04-19 Upgrade: [Kotlin 1.6.20][kotlin_1_6_20]. New: Support [Hierarchical project structure][hierarchical_projects]. If you're using Okio in a multiplatform project please upgrade your project to Kotlin 1.6.20 (or newer) to take advantage of this. With hierarchical projects it's easier to use properties like FileSystem.SYSTEM that ... (truncated) Commits https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428;>a161b07 Prepare for release 3.4.0. https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea;>c5f462b Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285;>#1285) https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb;>f21714d Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283;>#1283) https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037;>5f5db4a Merge pull request https://redirect.github.com/square/okio/issues/1284;>#1284 from square/renovate/com.google.jimfs https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8;>8af8d2a Update dependency com.google.jimfs:jimfs to v1.3.0 https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb;>b64c198 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282;>#1282) https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f;>ea82713 Merge pull request https://redirect.github.com/square/okio/issues/1281;>#1281 from square/renovate/gradle-7.x https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49;>3569daa Update dependency gradle to v7.6.2 https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a;>e937a50 Merge pull request https://redirect.github.com/square/okio/issues/1277;>#1277 from sifmelcara/fix-int-sign-conversion https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b;>81bce1a Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280;>#1280) Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0;>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio=maven=1.17.2=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it
[GitHub] [pulsar] github-actions[bot] commented on pull request #20802: Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql
github-actions[bot] commented on PR #20802: URL: https://github.com/apache/pulsar/pull/20802#issuecomment-1634598867 @dependabot[bot] Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] dependabot[bot] opened a new pull request, #20802: Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql
dependabot[bot] opened a new pull request, #20802: URL: https://github.com/apache/pulsar/pull/20802 Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0. Changelog Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md;>okio's changelog. Version 3.4.0 2023-07-07 New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) as an Okio FileSystem using fileSystem.asOkioFileSystem(). New: Adapt Android’s AssetManager as an Okio FileSystem using AssetFileSystem. This is in the new okio-assetfilesystem module. Android applications should prefer this over FileSystem.RESOURCES as it’s faster to load. Fix: Don't crash decoding GZIP files when the optional extra data (XLEN) is 32 KiB or larger. Fix: Resolve symlinks in FakeFileSystem.canonicalize(). Fix: Report the correct createdAtMillis in NodeJsFileSystem file metadata. We were incorrectly using ctimeMs, where c means changed, not created. Fix: UnsafeCursor is now Closeable. Version 3.3.0 2023-01-07 Fix: Don't leak resources when use {} is used with a non-local return. We introduced this performance and stability bug by not considering that non-local returns execute neither the return nor catch control flows. Fix: Use a sealed interface for BufferedSink and BufferedSource. These were never intended for end-users to implement, and we're happy that Kotlin now allows us to express that in our API. New: Change internal locks from synchronized to ReentrantLock and Condition. We expect this to improve help when using Okio with Java virtual threads ([Project Loom][loom]). Upgrade: [Kotlin 1.8.0][kotlin_1_8_0]. Version 3.2.0 2022-06-26 Fix: Configure the multiplatform artifact (com.squareup.okio:okio:3.x.x) to depend on the JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven builds. This should work-around an issue where Maven doesn't interpret Gradle metadata. Fix: Change CipherSource and CipherSink to recover if the cipher doesn't support streaming. This should work around a crash with AES/GCM ciphers on Android. New: Enable compatibility with non-hierarchical projects. Version 3.1.0 2022-04-19 Upgrade: [Kotlin 1.6.20][kotlin_1_6_20]. New: Support [Hierarchical project structure][hierarchical_projects]. If you're using Okio in a multiplatform project please upgrade your project to Kotlin 1.6.20 (or newer) to take advantage of this. With hierarchical projects it's easier to use properties like FileSystem.SYSTEM that ... (truncated) Commits https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428;>a161b07 Prepare for release 3.4.0. https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea;>c5f462b Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285;>#1285) https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb;>f21714d Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283;>#1283) https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037;>5f5db4a Merge pull request https://redirect.github.com/square/okio/issues/1284;>#1284 from square/renovate/com.google.jimfs https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8;>8af8d2a Update dependency com.google.jimfs:jimfs to v1.3.0 https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb;>b64c198 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282;>#1282) https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f;>ea82713 Merge pull request https://redirect.github.com/square/okio/issues/1281;>#1281 from square/renovate/gradle-7.x https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49;>3569daa Update dependency gradle to v7.6.2 https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a;>e937a50 Merge pull request https://redirect.github.com/square/okio/issues/1277;>#1277 from sifmelcara/fix-int-sign-conversion https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b;>81bce1a Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280;>#1280) Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0;>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio=maven=1.17.2=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it
[pulsar] branch dependabot/maven/pulsar-sql/com.squareup.okio-okio-3.4.0 created (now b38d3c4b2fc)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/pulsar-sql/com.squareup.okio-okio-3.4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git at b38d3c4b2fc Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql No new revisions were added by this update.
[GitHub] [pulsar-client-cpp] BewareMyPower opened a new pull request, #302: Fix the build failure with C++20 standard
BewareMyPower opened a new pull request, #302: URL: https://github.com/apache/pulsar-client-cpp/pull/302 ### Motivation When building the project with the `-DCMAKE_CXX_STANDARD=20` option and GCC 11.3, it failed. There are two main reasons. One is the `ObjectPool.h`, see http://eel.is/c++draft/diff.cpp17.class#2 In short, see the code below: ```c++ template struct A { // A() {} // error: simple-template-id not allowed for constructor A() {}// OK, injected-class-name used }; ``` The other reason is deeply hidden and OS-specific. When building the target for the unit test, the `lib/` directory is added into the include directories. So for `#include "Semaphore.h"`, the `Semaphore.h` header will be looked up first in the `lib/` directory. However, C++20 introduced a `` header, which finds the POSIX semaphore header `semaphore.h` in the system path. For example, the include order in `ubuntu:22.04` arm64 container is: - `$PROJECT_DIR/lib/` (where `Semaphore.h` is) - ... - `/usr/lib/gcc/aarch64-linux-gnu/11/include` (where `semaphore.h` is) The C++ header is case insensitive so the `lib/Semaphore.h` will be included by the `` header, which is implicitly included by ``. Our own `Semaphore.h` does not have the POSIX semaphore struct definitions so the build failed. ### Modifications - Fix the semantics error in `ObjectPool.h` - Remove the `lib/` directory from the included directories of the unit test and include `lib/xxx.h` for header in `lib/` directory. - Add a workflow to verify now it can be built with C++20 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] JeffBolle commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record
JeffBolle commented on PR #20623: URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634484762 > @JeffBolle Please rebase (or merge origin/master) to your PR branch and resolve the conflict. Roger, taking care of it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record
lhotari commented on PR #20623: URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634397306 @JeffBolle Please rebase (or merge origin/master) to your PR branch and resolve the conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] shibd commented on pull request #20791: [improve][io] Add notifyError method on PushSource.
shibd commented on PR #20791: URL: https://github.com/apache/pulsar/pull/20791#issuecomment-1634373968 > @shibd Would this PR change the interface for the ecosystem component? If yes, I think we shouldn't add it to branch-3.0. I remove the label `release-3.0.2` first. But feel free to confirm it and add back the label if it's suitable to cherry-pick to the LTS branch. It doesn't change any behavior of existing connectors, just new interfaces `notifyError`. I think this change is more like an interface enhancement or bug fix, and we should release to 3.0.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix] [io] elastic-search sink connector not support JSON.String schema. (#20741)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fa72ab4d1fc [fix] [io] elastic-search sink connector not support JSON.String schema. (#20741) fa72ab4d1fc is described below commit fa72ab4d1fc9991c20025ef86bb8a0b8d65e625b Author: Baodi Shi AuthorDate: Thu Jul 13 22:40:23 2023 +0800 [fix] [io] elastic-search sink connector not support JSON.String schema. (#20741) --- .../pulsar/io/elasticsearch/ElasticSearchSink.java | 13 +++- .../io/elasticsearch/ElasticSearchSinkTests.java | 80 ++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index e2566d20638..db2a96624bc 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -373,13 +373,22 @@ public class ElasticSearchSink implements Sink { return node; } -public static JsonNode extractJsonNode(Schema schema, Object val) { +public JsonNode extractJsonNode(Schema schema, Object val) throws JsonProcessingException { if (val == null) { return null; } switch (schema.getSchemaInfo().getType()) { case JSON: -return (JsonNode) ((GenericRecord) val).getNativeObject(); +Object nativeObject = ((GenericRecord) val).getNativeObject(); +if (nativeObject instanceof String) { +try { +return objectMapper.readTree((String) nativeObject); +} catch (JsonProcessingException e) { +log.error("Failed to read JSON string: {}", nativeObject, e); +throw e; +} +} +return (JsonNode) nativeObject; case AVRO: org.apache.avro.generic.GenericRecord node = (org.apache.avro.generic.GenericRecord) ((GenericRecord) val).getNativeObject(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 9fad03c3579..62592f5f09b 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.elasticsearch; import co.elastic.clients.transport.ElasticsearchTransport; +import com.fasterxml.jackson.core.JsonParseException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -243,6 +244,85 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { verify(mockRecord, times(1)).ack(); } +@Test +public final void sendJsonStringSchemaTest() throws Exception { + +when(mockRecord.getMessage()).thenAnswer(new Answer>>() { +@Override +public Optional> answer(InvocationOnMock invocation) throws Throwable { +final MessageImpl mock = mock(MessageImpl.class); + when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8)); +return Optional.of(mock); +} +}); + +when(mockRecord.getKey()).thenAnswer(new Answer>() { +public Optional answer(InvocationOnMock invocation) throws Throwable { +return Optional.empty(); +} +}); + +GenericRecord genericRecord = mock(GenericRecord.class); +when(genericRecord.getNativeObject()).thenReturn("{\"a\":1}"); +when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON); +when(mockRecord.getValue()).thenAnswer(new Answer() { +public GenericRecord answer(InvocationOnMock invocation) throws Throwable { +return genericRecord; +} +}); + +when(mockRecord.getSchema()).thenAnswer(new Answer() { +public Schema answer(InvocationOnMock invocation) throws Throwable { +return Schema.JSON(String.class); +} +}); + +map.put("indexName", "test-index"); +map.put("schemaEnable", "true"); +sink.open(map, mockSinkContext); +sink.write(mockRecord); +verify(mockRecord, times(1)).ack(); +} + +
[GitHub] [pulsar] shibd merged pull request #20741: [fix] [io] elastic-search sink connector not support JSON.String schema.
shibd merged PR #20741: URL: https://github.com/apache/pulsar/pull/20741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] alexesom opened a new issue, #20801: [Doc] Lack of parameters for admin.namespaces().splitNamespaceBundle in docs
alexesom opened a new issue, #20801: URL: https://github.com/apache/pulsar/issues/20801 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### What issue do you find in Pulsar docs? In [Split namespace bundles](https://pulsar.apache.org/docs/next/admin-api-namespaces/#split-namespace-bundles) the number of parameters is incorrect. In the Namespaces.java: /** * Split namespace bundle. * * @param namespace * @param bundle range of bundle to split * @param unloadSplitBundles * @param splitAlgorithmName * @throws PulsarAdminException */ void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException; ### What is your suggestion? Correct the signature of the method and provide explanations for the new parameters. ### Any reference? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix][io] Not restart instance when kafka source poll exception. (#20795)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 911fbf5fa2b [fix][io] Not restart instance when kafka source poll exception. (#20795) 911fbf5fa2b is described below commit 911fbf5fa2b49825d7dcbf2270f0329a5267a2fa Author: Baodi Shi AuthorDate: Thu Jul 13 21:11:06 2023 +0800 [fix][io] Not restart instance when kafka source poll exception. (#20795) --- .../pulsar/io/kafka/KafkaAbstractSource.java | 53 -- .../io/kafka/source/KafkaAbstractSourceTest.java | 43 +- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 012e4143744..3d4612c039f 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -133,7 +132,6 @@ public abstract class KafkaAbstractSource extends PushSource { throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex); } this.start(); -running = true; } protected Properties beforeCreateConsumer(Properties props) { @@ -158,47 +156,36 @@ public abstract class KafkaAbstractSource extends PushSource { @SuppressWarnings("unchecked") public void start() { +LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); runnerThread = new Thread(() -> { -LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic()); - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); while (running) { -ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); -CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; -int index = 0; -for (ConsumerRecord consumerRecord : consumerRecords) { -KafkaRecord record = buildRecord(consumerRecord); -if (LOG.isDebugEnabled()) { -LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); +try { +ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); +CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; +int index = 0; +for (ConsumerRecord consumerRecord : consumerRecords) { +KafkaRecord record = buildRecord(consumerRecord); +if (LOG.isDebugEnabled()) { +LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); +} +consume(record); +futures[index] = record.getCompletableFuture(); +index++; } -consume(record); -futures[index] = record.getCompletableFuture(); -index++; -} -if (!kafkaSourceConfig.isAutoCommitEnabled()) { -try { +if (!kafkaSourceConfig.isAutoCommitEnabled()) { CompletableFuture.allOf(futures).get(); consumer.commitSync(); -} catch (InterruptedException ex) { -break; -} catch (ExecutionException ex) { -LOG.error("Error while processing records", ex); -break; } +} catch (Exception e) { +LOG.error("Error while processing records", e); +notifyError(e); +break; } } }); -runnerThread.setUncaughtExceptionHandler( -(t, e) -> { -new Thread(() -> { -LOG.error("[{}] Error while consuming records", t.getName(), e); -try { -this.close(); -} catch (Exception ex) { -
[GitHub] [pulsar] shibd merged pull request #20795: [fix][io] Not restart instance when kafka source poll exception.
shibd merged PR #20795: URL: https://github.com/apache/pulsar/pull/20795 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on pull request #20800: [improve][broker] Add the MessageExpirer interface to make code clear
github-actions[bot] commented on PR #20800: URL: https://github.com/apache/pulsar/pull/20800#issuecomment-1634165256 @BewareMyPower Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] BewareMyPower opened a new pull request, #20800: [improve][broker] Add the MessageExpirer interface to make code clear
BewareMyPower opened a new pull request, #20800: URL: https://github.com/apache/pulsar/pull/20800 ### Motivation When I reviewed https://github.com/apache/pulsar/pull/20597, the unrelated changes in `PersistentTopicsBase` are hard to read. The logic could be simplified to: ```java PersistentSubscription sub = null; PersistentReplicator repl = null; if (metSomeCondition()) { repl = /* ... */; if (repl == null) { /* ... */ return; } } else { sub = /* ... */; if (repl == null) { /* ... */ return; } } final PersistentSubscription finalSub = sub; final PersistentReplicator finalRepl = repl; future.thenAccept(__ -> { if (metSomeCondition()) { repl.expireMessages(/* ... */); } else { sub.expireMessages(/* ... */); } }); ``` The code above is such a mess. It adds two final variables because the lambda can only capture final variables. The `metSomeCondition` check is performed unnecessarily twice. The original code is more hard to read because the logic in `/* ... */` takes a few lines so that the two calls of `metSomeCondition()` are not near. From the code search I see all these classes implement two `expireMessages` methods that accept an integer or a position. - PersistentMessageExpiryMonitor - PersistentSubscription - PersistentReplicator - NonPersistentSubscription The code can be simplified to introduce a new interface. ### Modifications Introduce a `MessageExpirer` interface and change the class hierarchy to: ``` // [I] is interface, [C] is class [I] MessageExpirer [I] Subscription [C] PersistentSubscription [C] NonPersistentSubscription [C] PersistentReplicator [C] PersistentMessageExpiryMonitor ``` The method invocation can be simplified much as shown in this patch. P.S. Inserting such an interface in the type hierarchy does not even break the ABI compatibility, see https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [x] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20748: [pip][design] PIP-281: Optimize Bundle Unload(Transfer) Protocol for ExtensibleLoadManager
Demogorgon314 commented on code in PR #20748: URL: https://github.com/apache/pulsar/pull/20748#discussion_r1262482139 ## pip/pip-281.md: ## @@ -0,0 +1,233 @@ + + +# Background knowledge + +- Pulsar broker load balancer periodically unloads bundles from overloaded brokers. During this unload process, previous owner brokers close topic sessions(e.g. producers, subscriptions(consumers), managed ledgers). When re-assigned, new owner brokers recreate the topic sessions. + +- Pulsar clients request `CommandLookupTopic` to lookup or assign owner brokers for topics and connect to them. + +- PIP-192, the extensible load balancer introduced the bundle state channel that event-sources this unloading process in a state machine manner, from `releasing,` `assigned`, to `owned` state order. At `releasing,` the owner broker "releases" the bundle ownership(close topic sessions). + +- PIP-192, the extensible load balancer introduced TransferShedder, a new shedding strategy, which pre-assigns new owner brokers beforehand. + + +# Motivation + +- When unloading closes many topic sessions, then many clients need to request CommandLookupTopic at the same time, which could cause many lookup requests on brokers. This unloading process can be further optimized if we can let the client directly connect to the new owner broker without following `CommandLookupTopic` requests. +- In the new load balancer(pip-192), since the owner broker is already known, we can modify the close command protocol to pass the new destination broker URL and skip the lookup requests. +- Also, when unloading, we can gracefully shutdown ledgers -- we always close old managed ledgers first and then recreate it on the new owner without conflicts. + +# Goals +- Remove clients' lookup requests in the unload protocol +- Gracefully shutdown managed ledgers before new owners create them. + +## In Scope + + + +- This change will be added in the extensible load balancer. + +## Out of Scope + + + +- This won't change the existing load balancer behavior(modular load manager). + + + +# High Level Design + + + +Current Unload and Lookup Sequence in Extensible Load Balancer +```mermaid +sequenceDiagram +participant Clients +participant Owner Broker +participant New Owner Broker +participant Leader Broker +Leader Broker ->> Owner Broker: "state:Releasing:" close topic +Owner Broker ->> Owner Broker: close broker topic sessions +Owner Broker ->> Clients: close producers and consumers +Clients ->> Clients: reconnecting (inital delay 100ms) +Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership +New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership +Clients ->> Owner Broker: lookup +Owner Broker ->> Clients: redirect +Clients ->> New Owner Broker: lookup +New Owner Broker ->> Clients: return(connected) +``` + +Proposed Unload Sequence in Extensible Load Balancer without Lookup +```mermaid +sequenceDiagram +participant Clients +participant Owner Broker +participant New Owner Broker +participant Leader Broker +Leader Broker ->> Owner Broker: "state:Releasing:" close topic +Owner Broker ->> Owner Broker: close broker topic sessions(e.g ledgers) without disconnecting producers/consumers(fenced) +Clients -->> Owner Broker: message pubs are ignored +Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership +New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership +Owner Broker ->> Owner Broker: close the fenced broker topic sessions +Owner Broker ->> Clients: close producers and consumers (with newOwnerBrokerUrl) +Clients ->> New Owner Broker: immediately connect +``` + + +# Detailed Design + +## Design & Implementation Details + + + +- Modify CommandCloseProducer, CommandCloseConsumer to pass optional brokerServiceUrls +``` +message CommandCloseProducer { Review Comment: We also need to consider the Pulsar proxy, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] rob-kvietkauskas-ign commented on issue #20722: [Bug] Unable to create Connectors on Pulsar 2.9.1 (bare metal cluster)
rob-kvietkauskas-ign commented on issue #20722: URL: https://github.com/apache/pulsar/issues/20722#issuecomment-1634149488 Yesterday I have spent some time analizing 2.11.1 Pulsar source code, trying to figure out what may be the cause of zookeper connection to `localhost` (even though, nor broker, nor bookie, nor functions worker configuration does not contain any entries related to `localhost`). After digging the code that is mentioned in the stacktraces I've provided and stacktraces I get using Pulsar 2.11.1, I have a strong feeling that problem is located in `uploadToBookKeeper(Namespace, InputStream, String)` method call in [WorkerUtils class](https://github.com/apache/pulsar/blob/7233f0e6616ea54d841be7d17bf2abef4d3827c7/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java#L98). After this method gets called, BookKeeper client tries to access a bookkeeper via ZooKeeper, but somehow (not sure how exactly) ZooKeeperClient gets initialized with localhost value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record
codecov-commenter commented on PR #20623: URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634147460 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report > Merging [#20623](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) (3556617) into [master](https://app.codecov.io/gh/apache/pulsar/commit/677d160148afc935c34a072fb62b765a8a65045c?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) (677d160) will **increase** coverage by `41.18%`. > The diff coverage is `74.07%`. [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20623/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #20623 +/- ## = + Coverage 31.93% 73.11% +41.18% - Complexity1177932039+20260 = Files 1498 1866 +368 Lines114595 139072+24477 Branches 1242815302 +2874 = + Hits 36591 101689+65098 + Misses7315829323-43835 - Partials 4846 8060 +3214 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.16% <ø> (?)` | | | systests | `24.92% <0.00%> (?)` | | | unittests | `72.41% <74.07%> (+40.47%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...che/pulsar/io/elasticsearch/ElasticSearchSink.java](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWlvL2VsYXN0aWMtc2VhcmNoL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvaW8vZWxhc3RpY3NlYXJjaC9FbGFzdGljU2VhcmNoU2luay5qYXZh) | `74.61% <74.07%> (ø)` | | ... and [1566 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20623/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] daziz opened a new pull request, #20798: Correcting spelling mistakes in the pulsar-broker module
daziz opened a new pull request, #20798: URL: https://github.com/apache/pulsar/pull/20798 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation ### Modifications Corrected all spelling mistakes I found in the pulsar-broker module. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [X] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari commented on issue #20797: [Tests] Thread resource leaks in unit tests
lhotari commented on issue #20797: URL: https://github.com/apache/pulsar/issues/20797#issuecomment-1634081930 Simple way to reproduce the thread leak in ServerCnxTest by executing a single test method: ``` mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker -Dtest=ServerCnxTest#testConnectCommandWithEnum ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari opened a new issue, #20797: [Tests] Thread resource leaks in unit tests
lhotari opened a new issue, #20797: URL: https://github.com/apache/pulsar/issues/20797 Thread leaks: ``` ❯ grep -shEr "created [0-9]+ new threads" pulsar-broker | awk -F "Summary: " '{ print $2 }' | awk '{ print $(NF-2), $0}' |sort -rn | cut -f2- -d' ' Tests in class org.apache.pulsar.broker.service.ServerCnxTest created 849 new threads Tests in class org.apache.pulsar.broker.service.ClusterMigrationTest created 743 new threads Tests in class org.apache.pulsar.broker.transaction.AuthenticatedTransactionProducerConsumerTest created 100 new threads Tests in class org.apache.pulsar.broker.loadbalance.LoadBalancerTest created 92 new threads Tests in class org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionTest created 89 new threads Tests in class org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionExpirationTest created 81 new threads Tests in class org.apache.pulsar.broker.transaction.pendingack.PendingAckInMemoryDeleteTest created 78 new threads Tests in class org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest created 78 new threads Tests in class org.apache.pulsar.broker.service.MessageCumulativeAckTest created 62 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorTest created 61 new threads Tests in class org.apache.pulsar.broker.service.BrokerBookieIsolationTest created 51 new threads Tests in class org.apache.pulsar.broker.loadbalance.SimpleLoadManagerImplTest created 51 new threads Tests in class org.apache.pulsar.broker.loadbalance.ModularLoadManagerImplTest created 47 new threads Tests in class org.apache.pulsar.broker.service.TopicOwnerTest created 40 new threads Tests in class org.apache.pulsar.broker.service.BrokerServiceTest created 37 new threads Tests in class org.apache.pulsar.broker.service.BrokerBkEnsemblesTests created 32 new threads Tests in class org.apache.pulsar.broker.SLAMonitoringTest created 31 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsTest created 26 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsWithKeyStoreTest created 25 new threads Tests in class org.apache.pulsar.broker.service.persistent.PersistentSubscriptionTest created 25 new threads Tests in class org.apache.pulsar.broker.service.PeerReplicatorTest created 24 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorSubscriptionTest created 23 new threads Tests in class org.apache.pulsar.broker.service.MaxMessageSizeTest created 23 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorTopicPoliciesTest created 22 new threads Tests in class org.apache.pulsar.broker.service.OneWayReplicatorTest created 20 new threads Tests in class org.apache.pulsar.broker.service.schema.SchemaServiceTest created 18 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorTlsTest created 18 new threads Tests in class org.apache.pulsar.broker.service.ReplicatorRemoveClusterTest created 18 new threads Tests in class org.apache.pulsar.broker.service.BacklogQuotaManagerTest created 17 new threads Tests in class org.apache.pulsar.broker.service.schema.PartitionedTopicsSchemaTest created 16 new threads Tests in class org.apache.pulsar.broker.loadbalance.SimpleBrokerStartTest created 13 new threads Tests in class org.apache.pulsar.broker.loadbalance.LeaderElectionServiceTest created 13 new threads Tests in class org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTaskTest created 13 new threads Tests in class org.apache.pulsar.broker.auth.AuthorizationTest created 12 new threads Tests in class org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest created 11 new threads Tests in class org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadSchedulerTest created 10 new threads Tests in class org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest created 10 new threads Tests in class org.apache.pulsar.broker.service.OpportunisticStripingTest created 9 new threads Tests in class org.apache.pulsar.broker.service.ManagedLedgerCompressionTest created 9 new threads Tests in class org.apache.pulsar.broker.loadbalance.extensions.AntiAffinityNamespaceGroupExtensionTest created 8 new threads Tests in class org.apache.pulsar.broker.stats.ManagedCursorMetricsTest created 7 new threads Tests in class org.apache.pulsar.broker.service.BusyWaitServiceTest created 7 new threads Tests in class org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryTest created 7 new threads Tests in class org.apache.pulsar.broker.web.WebServiceTest created 6 new threads Tests in class org.apache.pulsar.broker.service.PersistentMessageFinderTest created 6 new threads Tests in class
[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
lhotari commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1634054904 Actually, setting `svcConfig.setBrokerShutdownTimeoutMs(5000L);` in ServerCnxTest didn't make the thread leak go away. There must be some other problem in shutdown sequence. I'll file another bug report for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
lhotari commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1634051197 UPDATE: the thread leak detection does some false alarms since broker shutdown is asynchronous in tests because of performance reasons. For example, in ServerCnxTest, ` svcConfig.setBrokerShutdownTimeoutMs(5000L);` makes the shutdown wait for completion, but that makes the test very slow. /cc @gaoran10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
RobertIndie commented on PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#issuecomment-1634027811 > Calculte the required resouce (semaphore/memory, when chunking, more than 1 semaphores, we cache the compressedPayload/meta in the sendRequest when Calculting) before we put a request into the dataChan, if there is no enough resource, fail fast, in this way, we can delete the sendRequest.blockCh field and no need to block; When enabling the chunking, we cannot get the number of total chunks before pushing the request to the dataChan. And there may be a deadlock issue similar to https://github.com/apache/pulsar/issues/17446 > Add a sendRequest.done() method, when a request is done (succeed or failed), call it, release the resources a request holds, run the callback, report metrics, write debug logs in this method, in this way, we manage the resource/logic together and don't have to do these things across the whole file. +1 for this. It's a good practice to manage the resource. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
RobertIndie commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1262383421 ## pulsar/producer_partition.go: ## @@ -542,6 +538,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { uncompressedSize := len(uncompressedPayload) + // try to reserve memory for uncompressedPayload + if !p.canReserveMem(request, int64(uncompressedSize)) { + return Review Comment: I'm +1 for moving the semaphore release out of `canReserveMem`. It's better that we release it here than in the `canReserveMem` before we find a good solution for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema
RobertIndie commented on code in PR #1055: URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1262354724 ## pulsar/producer_partition.go: ## @@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } - if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + runCallback(sr.callback, nil, sr.msg, errContextExpired) + return false + } + } + p.metrics.MessagesPending.Inc() + p.metrics.BytesPending.Add(float64(len(sr.msg.Payload))) Review Comment: I think we should consider the `msg.Schema` here when using the Schema. And why we add the bytes pending here instead of `canReserveMem` ? ## pulsar/producer_test.go: ## @@ -1924,6 +1924,159 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.NoError(t, err) } +func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MemoryLimitBytes: 100 * 6, + }) + assert.NoError(t, err) + defer c.Close() + + schema := NewAvroSchema(`{"fields": + [ + {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]} + ], + "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, nil) + + topicName := newTopicName() + producer1, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + producer2, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + // the size of encoded value is 6 bytes + value := map[string]interface{}{ + "id": 0, + "name": map[string]interface{}{ + "string": "abc", + }, + } + + n := 101 + for i := 0; i < n/2; i++ { + producer1.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + + producer2.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + } + // Last message in order to reach the limit + producer1.SendAsync(context.Background(), { + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + _, err = producer2.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + // flush pending msg + err = producer1.Flush() + assert.NoError(t, err) + err = producer2.Flush() + assert.NoError(t, err) + assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.NoError(t, err) + _, err = producer2.Send(context.Background(), { + Value: value, + Schema: schema, + }) + assert.NoError(t, err) +} + +func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, +
[GitHub] [pulsar] gaoran10 commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
gaoran10 commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633998198 @lhotari Thanks for this discovery, I'll check the dump and try to fix this problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #20330: [optimize][admin]Enhancing Transaction Buffer Stats and Introducing TransactionBufferInternalStats API
gaoran10 commented on code in PR #20330: URL: https://github.com/apache/pulsar/pull/20330#discussion_r1262322512 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ## @@ -431,6 +437,70 @@ protected CompletableFuture internalGetPendi ); } +protected CompletableFuture internalGetTransactionBufferInternalStats( +boolean authoritative, boolean metadata) { +TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); +return getExistingPersistentTopicAsync(authoritative) +.thenCompose(topic -> { +TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); +if (snapshotType == null) { +return FutureUtil.failedFuture(new RestException(NOT_FOUND, +"Transaction buffer Snapshot for the topic does not exist")); +} else if (snapshotType == TransactionBuffer.SnapshotType.Segment) { +transactionBufferInternalStats.snapshotType = snapshotType.toString(); +TopicName segmentTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); +return getTxnSnapshotInternalStats(segmentTopic, metadata) +.thenApply(snapshotSystemTopicInternalStats -> { Review Comment: Maybe we can use the method `thenAccept` here. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java: ## @@ -66,9 +67,10 @@ public interface AbortedTxnProcessor { /** * Get the lastSnapshotTimestamps. - * @return the lastSnapshotTimestamps. + * + * @return a transactionBufferStats with the stats in the abortedTxnProcessor. */ -long getLastSnapshotTimestamps(); +TransactionBufferStats generateSnapshotStats(boolean segmentStats); Review Comment: Do we need to add a new method for compatibility? ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType subType) { return subTypesEnabled != null && subTypesEnabled.contains(subType); } -public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks) { -return this.transactionBuffer.getStats(lowWaterMarks); +public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks, boolean segmentStats) { Review Comment: Do we need to add a new method for compatibility? /cc @BewareMyPower @codelipenghui ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ## @@ -431,6 +437,70 @@ protected CompletableFuture internalGetPendi ); } +protected CompletableFuture internalGetTransactionBufferInternalStats( +boolean authoritative, boolean metadata) { +TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); +return getExistingPersistentTopicAsync(authoritative) +.thenCompose(topic -> { +TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); +if (snapshotType == null) { +return FutureUtil.failedFuture(new RestException(NOT_FOUND, +"Transaction buffer Snapshot for the topic does not exist")); +} else if (snapshotType == TransactionBuffer.SnapshotType.Segment) { +transactionBufferInternalStats.snapshotType = snapshotType.toString(); +TopicName segmentTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); +return getTxnSnapshotInternalStats(segmentTopic, metadata) +.thenApply(snapshotSystemTopicInternalStats -> { + transactionBufferInternalStats.segmentInternalStats = +snapshotSystemTopicInternalStats; +return transactionBufferInternalStats; +}).thenCompose(ignore -> { Review Comment: Maybe we can use the `thenCombine`, such as this. ``` var segmentStatsFuture = ...; var indexStatsFuture = ...; segmentStatsFuture.thenCombine(indexStatsFuture, (segmentStats, indexStats) -> { transactionBufferInternalStats.segmentInternalStats = segmentStats;
[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
lhotari commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633958559 thread leaks: ``` ❯ grep -shEr "created [0-9][0-9]+ new threads" pulsar-broker 2023-07-13T06:18:12,115 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsTest created 26 new threads 2023-07-13T06:09:10,589 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.TopicOwnerTest created 40 new threads 2023-07-13T06:18:27,708 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.BrokerBkEnsemblesTests created 32 new threads 2023-07-13T05:59:04,195 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.loadbalance.SimpleBrokerStartTest created 13 new threads 2023-07-13T06:02:54,883 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadSchedulerTest created 10 new threads 2023-07-13T06:27:27,595 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.BacklogQuotaManagerTest created 17 new threads 2023-07-13T06:24:54,267 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.BrokerServiceTest created 37 new threads 2023-07-13T06:31:26,617 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorSubscriptionTest created 23 new threads 2023-07-13T06:14:34,465 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorTlsTest created 18 new threads 2023-07-13T06:34:40,969 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest created 78 new threads 2023-07-13T06:32:03,632 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.BrokerBookieIsolationTest created 51 new threads 2023-07-13T06:00:16,670 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTaskTest created 13 new threads 2023-07-13T06:07:32,859 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.PeerReplicatorTest created 24 new threads 2023-07-13T06:05:18,690 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.auth.AuthorizationTest created 12 new threads 2023-07-13T06:05:50,722 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.loadbalance.SimpleLoadManagerImplTest created 51 new threads 2023-07-13T06:04:49,333 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionTest created 89 new threads 2023-07-13T06:10:48,796 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest created 11 new threads 2023-07-13T06:30:07,831 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorTopicPoliciesTest created 22 new threads 2023-07-13T06:31:28,204 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest created 10 new threads 2023-07-13T06:20:06,450 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ClusterMigrationTest created 743 new threads 2023-07-13T06:20:30,680 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.schema.PartitionedTopicsSchemaTest created 16 new threads 2023-07-13T06:08:03,645 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorRemoveClusterTest created 18 new threads 2023-07-13T06:06:43,539 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsWithKeyStoreTest created 25 new threads 2023-07-13T06:27:39,557 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.MessageCumulativeAckTest created 62 new threads 2023-07-13T06:29:35,866 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class org.apache.pulsar.broker.service.ServerCnxTest created 849 new threads 2023-07-13T06:05:18,567 - WARN - [main:ThreadLeakDetectorListener@60] - Summary: Tests in class
[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`
GitHub user oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected` @lhotari The problem remains the same, is that related with the slow fsync? GitHub link: https://github.com/apache/pulsar/discussions/20773#discussioncomment-6436778 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] rob-kvietkauskas-ign edited a discussion: Functions worker configuration for bare-metal cluster
GitHub user rob-kvietkauskas-ign edited a discussion: Functions worker configuration for bare-metal cluster Hello! :wave: Could someone assist me with advices/tips on function worker configuration in bare-metal cluster? I have a cluster of 6 machines (3 for brokers and bookies, 3 for zookeepers). Zookeeper cluster is functioning fine, same goes for Bookies (I successfully ran sanity check and _simpletest_). Brokers also work fine. I am trying to run functions workers with brokers, but after I perform setup steps described in documentation, I end up having bookkeeper (ledger allocation) errors, which in turn seem to be caused by incorrect configuration, every time I try to create (source or sink) connector. I have created an [issue](https://github.com/apache/pulsar/issues/20722), but haven't received any replies yet so I am trying my luck in Discussion section. I encounter same errors in different Pulsar versions. I have deployed four separate clusters – two running version 2.9.1. and the other two – running version 2.11.1. The result is the same in both cases. GitHub link: https://github.com/apache/pulsar/discussions/20796 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
lhotari commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633924891 @gaoran10 It looks like there's a resource leak somewhere. When the test times out there are 718 threads: https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_dump Lots of threads with the same name, for example: [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47c3c3a20)awaiting notification on [ [0x0800337ab3f8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800337ab3f8) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47ca98f80)awaiting notification on [ [0x08cab0d0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08cab0d0) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47caa29b0)awaiting notification on [ [0x0800336f0e18](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800336f0e18) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cc88f60)awaiting notification on [ [0x08001c456320](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08001c456320) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd2fd00)awaiting notification on [ [0x08001c4453c8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08001c4453c8) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd34020)awaiting notification on [ [0x0803f5e314f0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e314f0) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd38a20)awaiting notification on [ [0x0803f5e1e848](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e1e848) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd64200)awaiting notification on [ [0x0800167b9f60](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800167b9f60) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd6cce0)awaiting notification on [ [0x0803f5e29468](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e29468) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d6d9810)awaiting notification on [ [0x0803f5e35928](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e35928) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d7382f0)awaiting notification on [ [0x0803f49f6c58](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f49f6c58) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d9dd150)awaiting notification on [ [0x0803f5e1a4c8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e1a4c8) ] [pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47da15b80)awaiting notification on [
[pulsar-client-go] branch master updated: [Fix][Producer] Stop block request even if Value and Payload are both set (#1052)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new e45122c [Fix][Producer] Stop block request even if Value and Payload are both set (#1052) e45122c is described below commit e45122c2defc5efd4efc493d0acef278a7ccfc01 Author: gunli <24350...@qq.com> AuthorDate: Thu Jul 13 17:15:37 2023 +0800 [Fix][Producer] Stop block request even if Value and Payload are both set (#1052) ### Motivation Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && msg.Payload != nil`, request will be blocked forever 'cause `defer request.stopBlock()` is set up after the verify logic. ```go if msg.Value != nil && msg.Payload != nil { p.log.Error("Can not set Value and Payload both") runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) return } // The block chan must be closed when returned with exception defer request.stopBlock() ``` Here is the PR to stop block request even if Value and Payload are both set ### Modifications - pulsar/producer_partition.go - Co-authored-by: gunli --- pulsar/producer_partition.go | 11 ++- pulsar/producer_test.go | 15 +++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index dd45ff2..48411b4 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var schemaPayload []byte var err error - if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) - return - } // The block chan must be closed when returned with exception defer request.stopBlock() @@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer return } + if msg.Value != nil && msg.Payload != nil { + p.log.Error("Can not set Value and Payload both") + runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) + return + } + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) var txn *transaction diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 11ff089..adbdc71 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) { _, err = producer.Send(context.Background(), nil) assert.NotNil(t, err) + + _, err = producer.Send(context.Background(), { + Payload: []byte("hello"), + Value: []byte("hello"), + }) + assert.NotNil(t, err) } func TestProducerAsyncSend(t *testing.T) { @@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) { wg.Done() }) wg.Wait() + + wg.Add(1) + producer.SendAsync(context.Background(), {Payload: []byte("hello"), Value: []byte("hello")}, + func(id MessageID, m *ProducerMessage, e error) { + assert.NotNil(t, e) + assert.Nil(t, id) + wg.Done() + }) + wg.Wait() } func TestProducerCompression(t *testing.T) {
[GitHub] [pulsar-client-go] RobertIndie merged pull request #1052: [Fix][Producer] Stop block request even if Value and Payload are both set
RobertIndie merged PR #1052: URL: https://github.com/apache/pulsar-client-go/pull/1052 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest
lhotari commented on issue #20673: URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633812413 again: https://github.com/apache/pulsar/actions/runs/5536635370/jobs/10110676512?pr=20623#step:10:1201 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] fabien-dlb added a comment to the discussion: Client-Server compatibility matrix
GitHub user fabien-dlb added a comment to the discussion: Client-Server compatibility matrix Some more update reading [annoucement of pulsar 3.0.0](https://pulsar.apache.org/blog/2023/05/02/announcing-apache-pulsar-3-0/#compatibility-between-releases): > Before Pulsar 3.0, upgrades should be performed linearly through each feature > version. For example, when upgrading from 2.8 to 2.10, it is important to > upgrade to 2.9 before going to 2.10. So this confirms that we need to have brokers going through all versions and I understand that from 2.7.x we need to go through 2.8.x on broker side then client side, then to 2.9.x on broker side then client side, etc ... until 2.11.x from which version only we can update to 3.0.0 (note: still looking for confirmation that we can update from 2.x to 3.x and from what 2.x version(s) it is possible). GitHub link: https://github.com/apache/pulsar/discussions/20109#discussioncomment-6435673 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] shibd commented on pull request #20795: [fix][io] Not restart instance when kafka source poll exception.
shibd commented on PR #20795: URL: https://github.com/apache/pulsar/pull/20795#issuecomment-1633790526 @Technoboy- @poorbarcode Thanks for the feedback. About this comment: https://github.com/apache/pulsar/pull/20795#discussion_r1261959072 I refactor the start logic: 1. Let `consumer.subscribe` run on the start thread, which can quickly fail when subscribe exception. 2. Fix `InterruptedException` will be caught: when encountering an `InterruptedException` should be thrown it. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] annotated tag v2.10.5-candidate-1 updated (82c589fe97a -> 8aacaf9d16f)
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a change to annotated tag v2.10.5-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar.git *** WARNING: tag v2.10.5-candidate-1 was modified! *** from 82c589fe97a (commit) to 8aacaf9d16f (tag) tagging 82c589fe97a1a76d4a580845eb07e205a20a0356 (commit) replaces v2.10.4 by xiangying on Thu Jul 13 14:57:11 2023 +0800 - Log - Release v2.10.5-candidate-1 -BEGIN PGP SIGNATURE- iQJJBAABCgAzFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSvoEcVHHhpYW5neWlu Z0BhcGFjaGUub3JnAAoJEAL5I0/DMWb3WcgQAKD4vwyHwxfsQI5v6MbN0x+QkXAn +FqJTbPttCOwmWoxfhFeErON29HIEe6Rxx6UrCQKKwl83sQw5IyyawbsRnVKP/tP eMQBsaQ5nM1Nv4KhoRXh+j7P863pJag5J8WeDbYi6YrhVXjgma14ZjqXhL9mqv1z tkS1IAq0AhH0/w61tebcASVOuSWYpkyHCx7EqWepQxDUEvsRb2rIEo+oUUa+cpAE r3faKK8OABwVG8DKZtmz4qC5VZv70JMWaoVEdLquzLVRVNsIicPK3HwNK7UKnKEH hEgAd4xEQLF1JpA5eqqEyuewSu8hgXXPyZoA40Z9Xo8V9qWUSoj8zjElQ8wIh3Ls ZIbVDfnLG5mkomcIV3+epIQRTnX7/Gq+GXr03Cgdj5IdI5sEOyXLZLMuknfbNP9L FdoMEsgeSf3bJWz2J9399VPBG6EJhxArmJMYXUIdDlu5roPnKfAUHOAuapsSeq2b tZ2uZHLk7I1cGRIaBBx9nggjOMParFddmBbsfhwIwabgPv2cRrWaaAyof5FABRw5 xBq8iCbmozdia6Zow8YRzC5NlNs8IaOaPB5+VT7segeCD8/Sxo7sjXiuH+ixlvdN ttOJLvQoT7bmMozfNbcx7SFlCiU+6Z5PCM+0wxDGAtBuybkAMdmL2NF0lAOUUZuj Pd9iVSt1QrsgTrzo =s+UB -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #20718: [feat][broker][PIP-278] Support pluggable topic compaction service - part2
BewareMyPower commented on code in PR #20718: URL: https://github.com/apache/pulsar/pull/20718#discussion_r1262144922 ## pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java: ## @@ -400,6 +400,16 @@ private static Pair extractKeyAndSize(RawMessage m) { } } +protected List> extractIdsAndKeysAndSizeFromBatch(RawMessage msg) +throws IOException { +return RawBatchConverter.extractIdsAndKeysAndSize(msg); +} + +protected Optional rebatchMessage(RawMessage msg, BiPredicate filter) +throws IOException { +return RawBatchConverter.rebatchMessage(msg, filter); +} Review Comment: Let's keep it `protected` for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #300: [fix] Fix the assertion of `ConsumerWrapper`.receiveAtMost
BewareMyPower commented on PR #300: URL: https://github.com/apache/pulsar-client-cpp/pull/300#issuecomment-1633722258 Oh it's right. But `receiveAtLeast` is also wrong because this method cannot receive over N messages. The proper implementation of `receiveAtMost` might be: ```c++ Result receiveAtMost(int numMessages) { Message msg; for (int i = 0; i < numMessages; i++) { auto result = consumer_.receive(msg, 3000); if (result != ResultOk) { return result; } messageIdList_.emplace_back(msg.getMessageId()); } return ResultOk; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`
GitHub user oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected` Okay, appreciate for the suggestion. Does this slow fsync matters, would it effect the starting/deploying procedure or it would just slow down the message syncing? GitHub link: https://github.com/apache/pulsar/discussions/20773#discussioncomment-6435113 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`
GitHub user oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected` > The pulsar-all 3.0.0 image is invalid. It's actually Pulsar 2.11 So, what exact version should I use. Is that 2.9.5 ok? GitHub link: https://github.com/apache/pulsar/discussions/20773#discussioncomment-6435092 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar-client-cpp] RobertIndie commented on pull request #300: [fix] Fix the assertion of `ConsumerWrapper`.receiveAtMost
RobertIndie commented on PR #300: URL: https://github.com/apache/pulsar-client-cpp/pull/300#issuecomment-1633696870 > It's designed to be the current behavior. We can receive some messages first, then try to receive the rest messages. > You'd better add the assertion explicitly after calling receiveAtMost. From the references of receiveAtMost (in AcknowledgeTest), I think we don't need to assert there is no more messages Currently, the `receiveAtMost` doesn't implement the 'at most' guarantee. This would confuse the developer. I think we could have two options: 1. Implement the at most guarantee for `receiveAtMost` 2. Change it to `receiveAtLeast` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.10 updated: Release 2.10.5
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 82c589fe97a Release 2.10.5 new c185ae4c0cf Merge remote-tracking branch 'origin/branch-2.10' into branch-2.10 82c589fe97a is described below commit 82c589fe97a1a76d4a580845eb07e205a20a0356 Author: xiangying <1984997...@qq.com> AuthorDate: Thu Jul 13 14:54:19 2023 +0800 Release 2.10.5 --- bouncy-castle/bc/pom.xml | 2 +- bouncy-castle/bcfips-include-test/pom.xml | 2 +- bouncy-castle/bcfips/pom.xml | 2 +- bouncy-castle/pom.xml | 2 +- buildtools/pom.xml| 2 +- distribution/io/pom.xml | 2 +- distribution/offloaders/pom.xml | 2 +- distribution/pom.xml | 2 +- distribution/server/pom.xml | 2 +- docker/pom.xml| 2 +- docker/pulsar-all/pom.xml | 2 +- docker/pulsar/pom.xml | 2 +- jclouds-shaded/pom.xml| 2 +- kafka-connect-avro-converter-shaded/pom.xml | 2 +- managed-ledger/pom.xml| 2 +- pom.xml | 2 +- pulsar-broker-auth-athenz/pom.xml | 2 +- pulsar-broker-auth-sasl/pom.xml | 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker-shaded/pom.xml | 2 +- pulsar-broker/pom.xml | 2 +- pulsar-client-1x-base/pom.xml | 2 +- pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +- pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +- pulsar-client-admin-api/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml| 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-all/pom.xml | 2 +- pulsar-client-api/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml | 2 +- pulsar-client-auth-sasl/pom.xml | 2 +- pulsar-client-messagecrypto-bc/pom.xml| 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools-test/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml | 2 +- pulsar-common/pom.xml | 2 +- pulsar-config-validation/pom.xml | 2 +- pulsar-functions/api-java/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/java-examples/pom.xml| 2 +- pulsar-functions/localrun-shaded/pom.xml | 2 +- pulsar-functions/localrun/pom.xml | 2 +- pulsar-functions/pom.xml | 2 +- pulsar-functions/proto/pom.xml| 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/secrets/pom.xml | 2 +- pulsar-functions/utils/pom.xml| 2 +- pulsar-functions/worker/pom.xml | 2 +- pulsar-io/aerospike/pom.xml | 2 +- pulsar-io/aws/pom.xml | 2 +- pulsar-io/batch-data-generator/pom.xml| 2 +- pulsar-io/batch-discovery-triggerers/pom.xml | 2 +- pulsar-io/canal/pom.xml | 2 +- pulsar-io/cassandra/pom.xml | 2 +- pulsar-io/common/pom.xml | 2 +- pulsar-io/core/pom.xml| 2 +- pulsar-io/data-generator/pom.xml | 2 +- pulsar-io/debezium/core/pom.xml | 2 +- pulsar-io/debezium/mongodb/pom.xml| 2 +- pulsar-io/debezium/mssql/pom.xml | 2 +- pulsar-io/debezium/mysql/pom.xml | 2 +- pulsar-io/debezium/oracle/pom.xml | 2 +- pulsar-io/debezium/pom.xml| 2 +- pulsar-io/debezium/postgres/pom.xml | 2 +- pulsar-io/docs/pom.xml| 2 +- pulsar-io/dynamodb/pom.xml| 2 +- pulsar-io/elastic-search/pom.xml | 2 +- pulsar-io/file/pom.xml| 2 +- pulsar-io/flume/pom.xml | 2 +- pulsar-io/hbase/pom.xml
[GitHub] [pulsar] aloyszhang commented on a diff in pull request #20750: [fix][client] fix negative message re-delivery twice issue
aloyszhang commented on code in PR #20750: URL: https://github.com/apache/pulsar/pull/20750#discussion_r1262092557 ## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java: ## @@ -146,6 +146,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti consumer.negativeAcknowledge(msg); } +assertTrue(consumer instanceof ConsumerBase); +assertEquals(((ConsumerBase) consumer).getUnAckedMessageTracker().size(), 0); Review Comment: `ComsumerImpl` with non-batched message is sure to pass this test. Problems only happen in two situations described in Motivation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] littlecatjianjiao closed issue #20754: [Bug] flink consume multi-partition topic error
littlecatjianjiao closed issue #20754: [Bug] flink consume multi-partition topic error URL: https://github.com/apache/pulsar/issues/20754 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] littlecatjianjiao commented on issue #20754: [Bug] flink consume multi-partition topic error
littlecatjianjiao commented on issue #20754: URL: https://github.com/apache/pulsar/issues/20754#issuecomment-1633660913 > @cbornet Please confirm, but I think that this is a limitation in the flink job and it will only consume from pulsar in a single thread because input topic processing is in the job thread and not the task threads. Thank you for your reply. We have resolved this issue by upgrading the Flink version. It appears to be an issue with the pulsarsource provided by Flink, and I will close this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.10 updated: [fix] [broker] Can not receive any messages after switch to standby cluster (#20767)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 20adfe4854b [fix] [broker] Can not receive any messages after switch to standby cluster (#20767) 20adfe4854b is described below commit 20adfe4854b5d6c98b0ca5d03eda7a714854759a Author: fengyubiao AuthorDate: Thu Jul 13 09:51:45 2023 +0800 [fix] [broker] Can not receive any messages after switch to standby cluster (#20767) (cherry picked from commit 465fac523da946553b09298e13dc7dfcecfb6c78) --- .../ReplicatedSubscriptionsController.java | 10 ++- .../broker/service/ReplicatorSubscriptionTest.java | 91 ++ 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 1e1245ed36b..cf1603788f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because -log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", +log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos); -topic.createSubscription(update.getSubscriptionName(), -InitialPosition.Latest, true /* replicateSubscriptionState */, null); +topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest, +true /* replicateSubscriptionState */, Collections.emptyMap()) +.thenAccept(subscriptionCreated -> { + subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), +AckType.Cumulative, Collections.emptyMap()); +}); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 046adaa5ec2..250d971b9fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -24,10 +24,12 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Map; @@ -41,6 +43,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +158,93 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { "messages don't match."); } +@Test +public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception { +final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); +final String subscriptionName =
[pulsar] branch branch-2.10 updated: [improve] [broker] Add consumer-id into the log when doing subscribe. (#20568)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new e25d764eaea [improve] [broker] Add consumer-id into the log when doing subscribe. (#20568) e25d764eaea is described below commit e25d764eaea13b7435a915e9aabe8b05929026b1 Author: fengyubiao AuthorDate: Wed Jul 12 22:46:38 2023 +0800 [improve] [broker] Add consumer-id into the log when doing subscribe. (#20568) - Since `cnx.address + consumerId` is the identifier of one consumer; add `consumer-id` into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK. - print debug log if ack-command was discarded due to `ConsumerFuture is not complete.` - print debug log if sending a message to the client is failed. (cherry picked from commit a41ac49d9f30c415d87ce747393a16fa724cf4c9) --- .../org/apache/pulsar/broker/service/Consumer.java | 6 +++ .../apache/pulsar/broker/service/ServerCnx.java| 9 +++- .../client/api/SimpleProducerConsumerTest.java | 50 ++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index f2cd77e485e..8924b750eb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -318,6 +318,12 @@ public class Consumer { msgOutCounter.add(totalMessages); bytesOutCounter.add(totalBytes); chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); +} else { +if (log.isDebugEnabled()) { +log.debug("[{}-{}] Sent messages to client fail by IO exception[{}], close the connection" ++ " immediately. Consumer: {}", topicName, subscription, +status.cause() == null ? "" : status.cause().getMessage(), this.toString()); +} } }); return writeAndFlushPromise; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c1045c3d14b..e369e7d7b68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -994,7 +994,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { remoteAddress, getPrincipal()); } -log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); +log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.ctx().channel().toString(), +topicName, subscriptionName, consumerId); try { Metadata.validateMetadata(metadata, service.getPulsar().getConfiguration().getMaxConsumerMetadataSize()); @@ -1573,6 +1574,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } return null; }); +} else { +if (log.isDebugEnabled()) { +log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard" ++ " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(), +this.ctx().channel().toString(), ack.getMessageIdsCount()); +} } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 16a56af2039..577b9d5a648 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -39,6 +39,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; import io.netty.util.Timeout; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -79,6 +81,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.commons.lang3.RandomUtils;
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20795: [fix][io] Not restart instance when kafka source poll exception.
poorbarcode commented on code in PR #20795: URL: https://github.com/apache/pulsar/pull/20795#discussion_r1262054448 ## pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java: ## @@ -190,14 +190,12 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { -new Thread(() -> { -LOG.error("[{}] Error while consuming records", t.getName(), e); -try { -this.close(); -} catch (Exception ex) { -LOG.error("[{}] Close kafka source error", t.getName(), e); -} -}, "Kafka Source Close Task Thread").start(); +LOG.error("[{}] Error while consuming records", t.getName(), e); +try { +notifyError((Exception) e); Review Comment: Maybe throwable is not instance of `Exception` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org