2020-09-18 17:08:41 UTC - Piyush: Hey Everyone, I was wondering if we could 
reset topic cursor from a function.. if yes, how do we do that?
----
2020-09-18 17:15:45 UTC - Alexander Brown: @Alexander Brown has joined the 
channel
----
2020-09-18 17:25:37 UTC - Addison Higham: the right place to ask 
:slightly_smiling_face: Yes, there is a default that if there is no producers 
or consumer and no data, the topic will be deleted after 60 seconds, but this 
is highly configurable
----
2020-09-18 17:27:20 UTC - Addison Higham: There are rate limits that can limit 
that like `pulsar-admin namespaces set-dispatch-rate`, you can see all the 
limits under `pulsar-admin namespaces policies`
----
2020-09-18 17:28:00 UTC - Addison Higham: hrm... that does seem strange... 
@Sijie Guo have you ever seen anything like that?
----
2020-09-18 18:14:17 UTC - Sijie Guo: Interesting… I don’t think it will happen. 
@Sankararao Routhu how did you setup the clusters?
----
2020-09-18 18:15:27 UTC - Sijie Guo: You can call the Pulsar restful admin API 
to reset the cursor
----
2020-09-18 18:26:43 UTC - Yarden Arane: Hi @Sijie Guo,
I tried a setup using  pulsar standalone with a function that looks like 
follows:
```public class CursorManagementFunction implements Function<String, 
Void> {
    PulsarAdmin admin = PulsarAdmin.builder()
      .connectionTimeout(120, TimeUnit.SECONDS)
      .readTimeout(120, TimeUnit.SECONDS)
      .serviceHttpUrl("<http://127.0.0.1:8080>")
      .build()

    MessageId prevMessageId;
    @Override
    public void process(String input, Context context) {
       String topic = context.getCurrentRecord().getTopicName()
       String subName = context.getFunctionName
       admin.topics().resetCursor(topic, subName, prevMessageId);
    }
}```
However, the call to resetCursor throws this exception:
```java.lang.IllegalStateException: InjectionManagerFactory not found.
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:98)
 ~[?:?]
        at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_242]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:98)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:68)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:432)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:341)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:826)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getConfiguration(ClientRequest.java:285)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.validateHttpMethodAndEntity(JerseyInvocation.java:143)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.&lt;init&gt;(JerseyInvocation.java:112)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.&lt;init&gt;(JerseyInvocation.java:108)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.&lt;init&gt;(JerseyInvocation.java:99)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.method(JerseyInvocation.java:706)
 ~[?:?]
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.get(JerseyInvocation.java:566)
 ~[?:?]
        at 
org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:168)
 ~[org.apache.pulsar-pulsar-client-admin-original-2.6.1.jar:2.6.1]```

The same code not running in a Pulsar Function does work:
```public class CursorManagementApplication {
    PulsarAdmin admin = PulsarAdmin.builder()
      .connectionTimeout(120, TimeUnit.SECONDS)
      .readTimeout(120, TimeUnit.SECONDS)
      .serviceHttpUrl("<http://127.0.0.1:8080>")
      .build()

    MessageId prevMessageId;
    public static void main(String[] args){
       String topic = "my_topic"
       String subName = "my_subscription"
       admin.topics().resetCursor(topic, subName, prevMessageId);
    }
}```

----
2020-09-18 18:31:20 UTC - Sijie Guo: It seems that there is a library conflict
----
2020-09-18 18:31:27 UTC - Sijie Guo: Can you create a github issue?
----
2020-09-18 18:31:57 UTC - Yarden Arane: Thanks, will do.
----
2020-09-18 18:42:37 UTC - Yarden Arane: 
<https://github.com/apache/pulsar/issues/8089>
----
2020-09-18 19:22:15 UTC - Sankararao Routhu: Hi @Sijie Guo I setup two clusters 
and enabled geo-replication using global zookeeper
----
2020-09-18 19:26:23 UTC - Linton: awesome, thanks @Addison Higham
----
2020-09-18 19:38:36 UTC - Addison Higham: correct, you should be able to just 
provide mysql configurations
----
2020-09-18 19:54:36 UTC - aaron wang: @aaron wang has joined the channel
----
2020-09-19 00:37:56 UTC - Sankararao Routhu: looks like enabling dedup in 
broker stops this issue. Is it the reason for above issue @Addison Higham
----
2020-09-19 01:00:18 UTC - Addison Higham: It should not be necessary... It 
likely would mean some sort of misconfiguration, are the different clusters 
both properly named? They way this works is when a broker is replicating a 
topic, it attaches additional metadata to the messages, with global config, the 
brokers know when a message is coming  from a replicating cluster and then do 
not re-replicate. If the clusters are not properly configured somehow I wonder 
if that could cause this
----
2020-09-19 06:32:51 UTC - Joe Francis: Every replicated message has the source 
cluster and a replicated marker set before its sent out. Messages with the 
replication marker set are never replicated out . Unless some protobuf 
dependency issues  are messing with the marker bit offsets. Anyway, highly 
unlikely this is a broker  issue - I have been doing full mesh n-way Pulsar 
replication for many years and never seen this issue
----
2020-09-19 06:41:49 UTC - Joe Francis: I can theorize about a case where 
replication repeatedly times out , and, for eg: cluster A repeatedly tries to 
push a message to cluster B, So just one message in A could get duplicated many 
times to B under such edge conditions of network issues.  But bouncing back and 
forth? Can't think of a case..
----

Reply via email to