2020-08-15 10:38:03 UTC - Christophe Bornet: For the API, I'm thinking of a way
to register something like a gRPC CallStreamObserver (
<https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html|https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html>)
or a reactive streams Subscription
(<https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long-|https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long->)
which have a request(n) method to signal how much items the producer can push.
At startup the request would be called with the size of the queue. And then
request(1) is called each time an item is removed from the queue.
----
2020-08-15 12:29:58 UTC - Fernando: That article confirms my understanding that
messages shouldn’t be removed from the subscription’s backlog if
unacknowledged. The problem is that if I look at the topic stats, the
subscription is just gone :s
+1 : Julius S
----
2020-08-15 12:39:09 UTC - Fernando: It looks similar to this bug
<https://github.com/apache/pulsar/issues/5579|https://github.com/apache/pulsar/issues/5579>
however my namespace doesn’t have a TTL. Instead it has infinite retention,
which makes it weirder
----
2020-08-15 15:43:52 UTC - Joshua Decosta: ``` public static ByteBuf
newConnect(String authMethodName, AuthData authData, int protocolVersion,
String libVersion,
String targetBroker, String
originalPrincipal, AuthData originalAuthData,
String originalAuthMethod) {
CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
connectBuilder.setClientVersion(libVersion != null ? libVersion :
"Pulsar Client");
connectBuilder.setAuthMethodName(authMethodName);
if (targetBroker != null) {
// When connecting through a proxy, we need to specify which broker
do we want to be proxied through
connectBuilder.setProxyToBrokerUrl(targetBroker);
}
if (authData != null) {
connectBuilder.setAuthData(ByteString.copyFrom(authData.getBytes()));
}
if (originalPrincipal != null) {
connectBuilder.setOriginalPrincipal(originalPrincipal);
}
if (originalAuthData != null) {
connectBuilder.setOriginalAuthData(new
String(originalAuthData.getBytes(), UTF_8));
}
if (originalAuthMethod != null) {
connectBuilder.setOriginalAuthMethod(originalAuthMethod);
}
connectBuilder.setProtocolVersion(protocolVersion);
connectBuilder.setFeatureFlags(getFeatureFlags());
CommandConnect connect = connectBuilder.build();
ByteBuf res =
serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
connect.recycle();
connectBuilder.recycle();
return res;
}```
After debugging for a bit I keep hitting this method in `Commands.java` and I
see it's setting the original prinicple with the originalprinciple auth data.
Why is this happening? Am I supposed to be looking for the original principle
auth data my `AuthorizationProvider` methods? I still can't figure out why the
data isn't being passed.
----
2020-08-15 15:56:59 UTC - Joshua Decosta: I'm not sure where I should be
looking at this point.
----
2020-08-15 16:03:34 UTC - Joshua Decosta: This method in `ProxyConnection` gets
hit often:
``` private void completeConnect() {
<http://LOG.info|LOG.info>("[{}] complete connection, init proxy
handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Client already knows which broker to connect. Let's open a
// connection there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this,
proxyToBrokerUrl,
protocolVersionToAdvertise, sslHandlerSupplier);
cancelKeepAliveTask();
} else {
// Client is doing a lookup, we can consider the handshake complete
// and we'll take care of just topics and
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
}
}```
That else statement gets hit almost everytime.
----