2020-07-28 11:29:47 UTC - Allen ONeill: How can I get the number of unprocessed/ack'd messages waiting in a topic?
I think it is something like: admin.persistentTopics().getStats(topicName).numberOfEntries minus admin.persistentTopics().getStats(topicName).messagesConsumedCounter Is this correct or is there a simpler way in client lib that I am missing? +1 : Ryan, Nikita Muravyov ---- 2020-07-28 13:21:52 UTC - Thomas O'Neill: Does anyone know the minimum permissions needed to use Tiered Storage using the aws-s3 driver? ---- 2020-07-28 14:25:14 UTC - Matt Mitchell: Question about the java client and multiple producer/consumer threads… should each thread use its own instance of a Pulsar producer/consumer? ---- 2020-07-28 15:12:18 UTC - Daniel Ciocirlan: is there a way to delete the local backlog via the admin CLI for a partitioned topic ? ---- 2020-07-28 15:34:50 UTC - Addison Higham: @Takahiro Hozumi do you have logs for your broker? perhaps it is JVM memory issues? ---- 2020-07-28 15:49:46 UTC - tomscut: @tomscut has joined the channel ---- 2020-07-28 16:06:17 UTC - Addison Higham: Did you try `pulsar-admin topics clear-backlog`? I am not totally sure that works for partitioned topics, if it doesn't you could write a small bash script to reset it for each of the underlying topic partitions ---- 2020-07-28 16:07:49 UTC - Daniel Ciocirlan: thanks @Addison Higham, highly help full as always! :slightly_smiling_face: Much appreciated! ---- 2020-07-28 16:08:18 UTC - Addison Higham: if it doesn't work for partitioned topics, would you mind filing an issue? ---- 2020-07-28 16:08:44 UTC - Daniel Ciocirlan: i was looking for delete-backlog in the documentation and thats why i did not see it :slightly_smiling_face: ---- 2020-07-28 16:08:54 UTC - Daniel Ciocirlan: sure will give it a try now ---- 2020-07-28 16:13:31 UTC - Takahiro Hozumi: Hi, this is broker log just before shutting down, broker.conf and jvm boot option for broker, bookkeeper and zk. As broker have been restarted constantly yet, I may increase `-Xmx` for broker tomorrow. ---- 2020-07-28 16:19:45 UTC - Daniel Ciocirlan: so it worked perfect :wink: ---- 2020-07-28 16:19:48 UTC - Daniel Ciocirlan: thanks again! ---- 2020-07-28 16:41:06 UTC - Addison Higham: I thought the docs had an example policy somewhere, but I can't find it. I do know it is pretty minimal though, `GetObject*` and `PutObject*` and just to be safe `ListBucket` should be sufficient. ---- 2020-07-28 16:51:28 UTC - Addison Higham: Quick reminder, in 10 minutes a webinar is starting where myself, @Matteo Merli and @Joe Francis will be talking about experiences of running Pulsar in production, link here: <https://us02web.zoom.us/webinar/register/WN_xMt6QBJ9TWiyeVdifqKITg> ok_hand : Konstantinos Papalias, Matteo Merli, Frank Kelly +1 : Mathieu Druart, Thomas O'Neill ---- 2020-07-28 18:36:38 UTC - Sijie Guo: If you missed the webinar this morning, you can watch it now on Youtube <https://www.youtube.com/watch?v=mncXc_T6JkU&list=PLqRma1oIkcWhfmUuJrMM5YIG8hjju62Ev&index=2&t=2826s>. If you have any more questions, feel free to raise the questions here or reach out to @Joe Francis @Matteo Merli @Addison Higham. +1 : Kristi ---- 2020-07-28 18:57:22 UTC - Thomas O'Neill: Get and Put worked. I also added delete, but not sure if it deletes old files. Very new to pulsar. ---- 2020-07-28 20:28:19 UTC - Shivam Arora: Hello All, We are running Pulsar cluster on K8 with the 3Zk - 3 Broker - 4 Bookie setup. To mock one of many disaster scenario we restarted one k8 worker node. After cluster came back to original state we started getting below error. Was able to solve it only after restarting broker pods. Any help will be appreciated ```17:03:58.227 [pulsar-io-25-2] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.233.127.228:39826] Failed to create topic <persistent://public/default/companyagent.action.add-batch-consent> java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Failed to load topic within timeout at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_252] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_252] at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_252] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_252] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_252] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_252] at org.apache.pulsar.broker.service.BrokerService.lambda$futureWithDeadline$18(BrokerService.java:836) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0] at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] Caused by: java.util.concurrent.TimeoutException: Failed to load topic within timeout at org.apache.pulsar.broker.service.BrokerService.loadOrCreatePersistentTopic(BrokerService.java:955) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0] at org.apache.pulsar.broker.service.BrokerService.lambda$getTopic$13(BrokerService.java:731) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:277) ~[org.apache.pulsar-pulsar-common-2.6.0.jar:2.6.0] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:130) ~[org.apache.pulsar-pulsar-common-2.6.0.jar:2.6.0] at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:730) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0] at org.apache.pulsar.broker.service.ServerCnx.lambda$null$14(ServerCnx.java:841) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0]``` ---- 2020-07-28 20:52:26 UTC - Joshua Decosta: I’ve been working on a custom Authorization class and I’m confused by what I’m seeing when trying to produce or consume. When a consumer has the correct permissions, the messages are consumed no problem. When the consumer doesn’t have the correct permission the connection seems to never end. This is currently being tested in standalone. My question is what is the expected behavior when a client isn’t allowed to consume. Is it enough to return the completedFuture as false when the client doesn’t have permission? I should also note I’m using the pulsar-client cli for issuing these produce and consumer commands ---- 2020-07-28 22:24:21 UTC - Addison Higham: @Shivam Arora what version of Pulsar is that? There is some known issues with bookkeeper/pulsar on k8s where connections don't get properly cleaned up. You can tune your tcp keep alive timeout to help. There are also improvements in 2.6.0 for a number of smaller issues when running on kubernetes ---- 2020-07-28 23:21:00 UTC - Pratim SC: @Pratim SC has joined the channel ---- 2020-07-29 01:09:32 UTC - Addison Higham: @Joshua Decosta you should get back an error if not authorized ---- 2020-07-29 01:10:57 UTC - Joshua Decosta: @Addison Higham So that doesn’t seem to be happening. Are you saying I should throw an error in my AuthorizationProvider or that by returning false from my AuthorizationProvider methods that should do it? ---- 2020-07-29 01:12:22 UTC - Addison Higham: let me look at the code real quick ---- 2020-07-29 01:22:06 UTC - Addison Higham: @Joshua Decosta that is what I would expect if your future wasn't properly returning, but if you return a realized future of "false" it should terminate the connection ---- 2020-07-29 01:22:33 UTC - Addison Higham: @Joshua Decosta it is possible that the CLI is perhaps trying to hide that from you be trying to reconnect ---- 2020-07-29 01:23:25 UTC - Joshua Decosta: Is there a better way to test it? When I switched the auto provider back to the default i for the client acknowledgement that i wasn’t authorized ---- 2020-07-29 01:23:28 UTC - Addison Higham: you could look at the standalone logs and if you see something like `Client is not authorized to subscribe` then that would be hitting the branch that terminates the connection ---- 2020-07-29 01:24:14 UTC - Addison Higham: so you did see an error with the default provider using the CLI? ---- 2020-07-29 01:24:40 UTC - Addison Higham: then yes... I wonder if for some reason you aren't getting a future to resolve... ---- 2020-07-29 01:26:50 UTC - Joshua Decosta: I did see the error with the default ---- 2020-07-29 01:28:02 UTC - Addison Higham: and just to double check, your client can properly authenticate, but just isn't authorized (i.e. it's policy isn't allowed to produce for example) If you have a snippet to show the code, would be happy to take a look ---- 2020-07-29 01:33:19 UTC - Joshua Decosta: If I can show it I for sure will. The issue keeps coming up in the uthorizationService’ canLookUp method. Seems to be a timeout error. ---- 2020-07-29 01:33:34 UTC - Joshua Decosta: Mind you all I’m doing is checking a token for specific claims ---- 2020-07-29 01:33:54 UTC - Joshua Decosta: And when found returning completedFuture(true) ---- 2020-07-29 01:34:01 UTC - Joshua Decosta: Or returning false ---- 2020-07-29 01:37:22 UTC - Addison Higham: is it possible you are throwing an exception somewhere? Exceptions bubbling up and not being properly handled could result in this ---- 2020-07-29 02:25:25 UTC - Joshua Decosta: ``` public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) { try { String token = getToken(authenticationData); if (isBrokerToken(token)) { return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role)); } else { return hasTopicPermission(token, topicName, PRODUCE_PERMISSION); } } catch (AuthenticationException | ParseException e) { return CompletableFuture.completedFuture(false); } }``` This is an example of what Im doing. my `hasTopicPermission` returns a `CompletableFuture.completedFuture` with either true or false contained in it. ---- 2020-07-29 02:27:09 UTC - Joshua Decosta: ``` private CompletableFuture<Boolean> hasTopicPermission(String token, TopicName topicName, String permissionScope) throws ParseException { // a bunch of logic here to manipulate the token and grab this permission object out of it. for(PermissionRepresentation permission : permissions) { if (permission.getScopes() != null && permission.getScopes().contains(permissionScope)) { if (permission.getRsname() != null && permission.getRsname().equals(topicName.toString())) { return CompletableFuture.completedFuture(true); } } } return CompletableFuture.completedFuture(false); }``` ---- 2020-07-29 02:27:43 UTC - Joshua Decosta: This seems to work fine for true situations, but doesn't seem to work correclty for false situations ---- 2020-07-29 02:30:13 UTC - Joshua Decosta: and this is the error that gets spit out. ---- 2020-07-29 02:31:33 UTC - Joshua Decosta: @Addison Higham I think in general you might be right about the futures not completing but I can't understand why they would work in some situations but not others in the way I've implemented them. ---- 2020-07-29 02:46:53 UTC - Addison Higham: @Joshua Decosta perhaps try changing your `catch` to catch `Throwable` just to ensure it isn't an uncaught runtime exception. That said, still a pulsar issues as it should handle bubbled up exceptions, but I wouldn't be shocked if Pulsar assumes the auth providers won't throw runtime exceptions ---- 2020-07-29 03:01:31 UTC - Kristi: I've setup a test pulsar cluster in aws (3 bookies, 2 brokers, 3 zookeepers, 1 proxy host). I've been running a bin/pulsar-perf consumer and producer just to get some data flowing. I've got the producer producing 10k messages/second. It works great for a while, then something seems to happen and the throughput drops to 7.6k messages/second. I don't see cpu or memory or network bottlenecks. If I restart the pulsar-perf producer it goes back to 10k messages for some time then drops back down to 7.6k. Wondering if anyone has suggestions for what to check or tune? I don't see anything obvious in the logs ---- 2020-07-29 03:30:41 UTC - tuhaihe: @tuhaihe has joined the channel ---- 2020-07-29 05:42:51 UTC - Takahiro Hozumi: I found that the restart of brokers occurred as in the following step: ```Processing compaction inside a broker process increase memory usage, -> which causes JVM GC pause, -> which causes Zookeeper connection expired, -> which causes Shutting down broker. (-> and container is restarted by docker restart policy)``` In order not to trigger compaction on broker startup time, I increased compaction-threashold for the namespace and restart all brokers. And then ran a compaction processing for one partition as another process. `bin/pulsar compact-topic --topic <persistent://mytenant/mynamespace/mytopic-partition-0>` It seems that this separated compaction process also didn't work after consuming JVM memory. The process repeatedly disconnect/connect zk(I attached log). Is compaction requires larger memory than sum of key size in a target topic? I hope that compaction to be done with limited memory even if it takes time. ---- 2020-07-29 06:01:00 UTC - Addison Higham: how much memory do you have in your JVMs? ---- 2020-07-29 06:05:18 UTC - Takahiro Hozumi: 20GB. ```$ ps aux | grep java root 16185 829 42.5 40412340 27999344 pts/0 Ssl+ 04:04 974:16 /usr/local/openjdk-8/bin/java -cp /pulsar/conf:::/pulsar/lib/*: -Dlog4j.configurationFile=log4j2.yaml -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024 -Xms20g -Xmx20g -XX:MaxDirectMemorySize=20g -XX:+UseG1GC -Dpulsar.log.appender=RoutingAppender -Dpulsar.log.dir=/pulsar/logs -Dpulsar.log.level=info -Dpulsar.routing.appender.default=Console -Dpulsar.functions.process.container.log.dir=/pulsar/logs -Dpulsar.functions.java.instance.jar=/pulsar/instances/java-instance.jar -Dpulsar.functions.python.instance.file=/pulsar/instances/python-instance/python_instance_main.py -Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps -Dpulsar.functions.instance.classpath=/pulsar/conf:::/pulsar/lib/*: org.apache.pulsar.compaction.CompactorTool --broker-conf /pulsar/conf/broker.conf --topic <persistent://mytenant/mynamespace/mytopic-partition-0>``` The storageSize for a partition of target topic is 24834932716. ---- 2020-07-29 07:17:35 UTC - Walter: @Addison Higham How to check ledgers were only available on a single bookie ---- 2020-07-29 07:43:41 UTC - Walter: @Addison Higham the steps which you give helped ---- 2020-07-29 07:53:03 UTC - Shivam Arora: We are running these tests on 2.6.0. Should we tune just broker's tcp alive property or both broker and book keeper? ----
