2020-09-18 17:08:41 UTC - Piyush: Hey Everyone, I was wondering if we could reset topic cursor from a function.. if yes, how do we do that? ---- 2020-09-18 17:15:45 UTC - Alexander Brown: @Alexander Brown has joined the channel ---- 2020-09-18 17:25:37 UTC - Addison Higham: the right place to ask :slightly_smiling_face: Yes, there is a default that if there is no producers or consumer and no data, the topic will be deleted after 60 seconds, but this is highly configurable ---- 2020-09-18 17:27:20 UTC - Addison Higham: There are rate limits that can limit that like `pulsar-admin namespaces set-dispatch-rate`, you can see all the limits under `pulsar-admin namespaces policies` ---- 2020-09-18 17:28:00 UTC - Addison Higham: hrm... that does seem strange... @Sijie Guo have you ever seen anything like that? ---- 2020-09-18 18:14:17 UTC - Sijie Guo: Interesting… I don’t think it will happen. @Sankararao Routhu how did you setup the clusters? ---- 2020-09-18 18:15:27 UTC - Sijie Guo: You can call the Pulsar restful admin API to reset the cursor ---- 2020-09-18 18:26:43 UTC - Yarden Arane: Hi @Sijie Guo, I tried a setup using pulsar standalone with a function that looks like follows: ```public class CursorManagementFunction implements Function<String, Void> { PulsarAdmin admin = PulsarAdmin.builder() .connectionTimeout(120, TimeUnit.SECONDS) .readTimeout(120, TimeUnit.SECONDS) .serviceHttpUrl("<http://127.0.0.1:8080>") .build()
MessageId prevMessageId; @Override public void process(String input, Context context) { String topic = context.getCurrentRecord().getTopicName() String subName = context.getFunctionName admin.topics().resetCursor(topic, subName, prevMessageId); } }``` However, the call to resetCursor throws this exception: ```java.lang.IllegalStateException: InjectionManagerFactory not found. at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:98) ~[?:?] at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_242] at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:98) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:68) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:432) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:341) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:826) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getConfiguration(ClientRequest.java:285) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.validateHttpMethodAndEntity(JerseyInvocation.java:143) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:112) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:108) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:99) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.method(JerseyInvocation.java:706) ~[?:?] at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.get(JerseyInvocation.java:566) ~[?:?] at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:168) ~[org.apache.pulsar-pulsar-client-admin-original-2.6.1.jar:2.6.1]``` The same code not running in a Pulsar Function does work: ```public class CursorManagementApplication { PulsarAdmin admin = PulsarAdmin.builder() .connectionTimeout(120, TimeUnit.SECONDS) .readTimeout(120, TimeUnit.SECONDS) .serviceHttpUrl("<http://127.0.0.1:8080>") .build() MessageId prevMessageId; public static void main(String[] args){ String topic = "my_topic" String subName = "my_subscription" admin.topics().resetCursor(topic, subName, prevMessageId); } }``` ---- 2020-09-18 18:31:20 UTC - Sijie Guo: It seems that there is a library conflict ---- 2020-09-18 18:31:27 UTC - Sijie Guo: Can you create a github issue? ---- 2020-09-18 18:31:57 UTC - Yarden Arane: Thanks, will do. ---- 2020-09-18 18:42:37 UTC - Yarden Arane: <https://github.com/apache/pulsar/issues/8089> ---- 2020-09-18 19:22:15 UTC - Sankararao Routhu: Hi @Sijie Guo I setup two clusters and enabled geo-replication using global zookeeper ---- 2020-09-18 19:26:23 UTC - Linton: awesome, thanks @Addison Higham ---- 2020-09-18 19:38:36 UTC - Addison Higham: correct, you should be able to just provide mysql configurations ---- 2020-09-18 19:54:36 UTC - aaron wang: @aaron wang has joined the channel ---- 2020-09-19 00:37:56 UTC - Sankararao Routhu: looks like enabling dedup in broker stops this issue. Is it the reason for above issue @Addison Higham ---- 2020-09-19 01:00:18 UTC - Addison Higham: It should not be necessary... It likely would mean some sort of misconfiguration, are the different clusters both properly named? They way this works is when a broker is replicating a topic, it attaches additional metadata to the messages, with global config, the brokers know when a message is coming from a replicating cluster and then do not re-replicate. If the clusters are not properly configured somehow I wonder if that could cause this ---- 2020-09-19 06:32:51 UTC - Joe Francis: Every replicated message has the source cluster and a replicated marker set before its sent out. Messages with the replication marker set are never replicated out . Unless some protobuf dependency issues are messing with the marker bit offsets. Anyway, highly unlikely this is a broker issue - I have been doing full mesh n-way Pulsar replication for many years and never seen this issue ---- 2020-09-19 06:41:49 UTC - Joe Francis: I can theorize about a case where replication repeatedly times out , and, for eg: cluster A repeatedly tries to push a message to cluster B, So just one message in A could get duplicated many times to B under such edge conditions of network issues. But bouncing back and forth? Can't think of a case.. ----