2020-08-15 10:38:03 UTC - Christophe Bornet: For the API, I'm thinking of a way 
to register something like a gRPC CallStreamObserver ( 
<https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html|https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html>)
 or a reactive streams Subscription 
(<https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long-|https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long->)
 which have a request(n) method to signal how much items the producer can push. 
At startup the request would be called with the size of the queue. And then 
request(1) is called each time an item is removed from the queue.
----
2020-08-15 12:29:58 UTC - Fernando: That article confirms my understanding that 
 messages shouldn’t be removed from the subscription’s backlog if 
unacknowledged. The problem is that if I look at the topic stats, the 
subscription is just gone :s
+1 : Julius S
----
2020-08-15 12:39:09 UTC - Fernando: It looks similar to this bug 
<https://github.com/apache/pulsar/issues/5579|https://github.com/apache/pulsar/issues/5579>
 however my namespace doesn’t have a TTL. Instead it has infinite retention, 
which makes it weirder
----
2020-08-15 15:43:52 UTC - Joshua Decosta: ```    public static ByteBuf 
newConnect(String authMethodName, AuthData authData, int protocolVersion, 
String libVersion,
                                     String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
                                     String originalAuthMethod) {
        CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
        connectBuilder.setClientVersion(libVersion != null ? libVersion : 
"Pulsar Client");
        connectBuilder.setAuthMethodName(authMethodName);

        if (targetBroker != null) {
            // When connecting through a proxy, we need to specify which broker 
do we want to be proxied through
            connectBuilder.setProxyToBrokerUrl(targetBroker);
        }

        if (authData != null) {
            
connectBuilder.setAuthData(ByteString.copyFrom(authData.getBytes()));
        }

        if (originalPrincipal != null) {
            connectBuilder.setOriginalPrincipal(originalPrincipal);
        }

        if (originalAuthData != null) {
            connectBuilder.setOriginalAuthData(new 
String(originalAuthData.getBytes(), UTF_8));
        }

        if (originalAuthMethod != null) {
            connectBuilder.setOriginalAuthMethod(originalAuthMethod);
        }
        connectBuilder.setProtocolVersion(protocolVersion);
        connectBuilder.setFeatureFlags(getFeatureFlags());
        CommandConnect connect = connectBuilder.build();
        ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
        connect.recycle();
        connectBuilder.recycle();
        return res;
    }```
After debugging for a bit I keep hitting this method in `Commands.java`  and I 
see it's setting the original prinicple with the originalprinciple auth data. 
Why is this happening? Am I supposed to be looking for the original principle 
auth data my `AuthorizationProvider` methods? I still can't figure out why the 
data isn't being passed.
----
2020-08-15 15:56:59 UTC - Joshua Decosta: I'm not sure where I should be 
looking at this point.
----
2020-08-15 16:03:34 UTC - Joshua Decosta: This method in `ProxyConnection` gets 
hit often:

```    private void completeConnect() {
        <http://LOG.info|LOG.info>("[{}] complete connection, init proxy 
handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
        if (hasProxyToBrokerUrl) {
            // Client already knows which broker to connect. Let's open a
            // connection there and just pass bytes in both directions
            state = State.ProxyConnectionToBroker;
            directProxyHandler = new DirectProxyHandler(service, this, 
proxyToBrokerUrl,
                protocolVersionToAdvertise, sslHandlerSupplier);
            cancelKeepAliveTask();
        } else {
            // Client is doing a lookup, we can consider the handshake complete
            // and we'll take care of just topics and
            // partitions metadata lookups
            state = State.ProxyLookupRequests;
            lookupProxyHandler = new LookupProxyHandler(service, this);
            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
        }
    }```
That else statement gets hit almost everytime.
----

Reply via email to