2020-06-26 09:37:28 UTC - Ali Ahmed: some discussion of pulsar is the real
world it’s the last part of the talk.
<https://www.youtube.com/watch?v=CbirFaTDpWE>
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Hello :wave: Sorry this is going
to be a long post but I have an issue with Pulsar Functions and more precisely
the state storage of the functions and I’ll try to give as much as possible
information about what is happening.
I’m basically running 3 functions on a pulsar standalone instance, using docker
image apachepulsar/pulsar-all:2.6.0. There is only one instance of each
function for now:
• tasks-JobEnginePulsarFunction: This function keeps track of the state of some
jobs (they have a name and an ID basically). This function saves an entry in
the state for every job to run (one key per job) and send a message in a topic
for a consumer that will try to process the job and send various events back,
like job start, and then failed / completed. The function will record in the
state the events that happened for that job and when failed it will send a new
message to have the job retried. When it receives an event saying the job
completed, it will delete the associated state because it won’t be used
anymore. When the status of a job changes, the function publishes a message to
a topic named “tasks-monitoring-per-name” containing the old status and the new
status for a job)
• MonitoringPerName: This function uses counters and state to store how many
jobs are in a given state. Based on the state update published by the previous
function, it will decrement the counter corresponding to the old status of the
job, and increment the counter corresponding to the new status. Because
counters are not easy to retrieve from the outside world, this function also
stores the values of the counters in function state. When a new job name is
received, the function publishes in a “tasks-monitoring-global” topic the fact
that it has received a job which was previously not know, and will initialise
counters and state for it.
• MonitoringGlobal: This function uses state to store a list of all job names
that are known, based on the message published by the previous function.
When I feed the first function with 10k messages, at some point it will just
get stuck and stop doing anything. Usually between 2k - 5k message. It also
failed sometimes after a few hundreds messages. And one time it went through
the 10k messages without issue. I tried to run some commands while it was stuck
to get more information and here are some things I observed:
The function instance seems to be considered running by pulsar:
```# bin/pulsar-admin functions stats --name tasks-JobEnginePulsarFunction
{
"receivedTotal" : 315,
"processedSuccessfullyTotal" : 314,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 118.07828642993623,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1593099037904,
"instances" : [ {
"instanceId" : 0,
"metrics" : {
"receivedTotal" : 315,
"processedSuccessfullyTotal" : 314,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 118.07828642993623,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1593099037904,
"userMetrics" : { }
}
} ]
}```
Gettings stats about the topic shows the function has some message to process
on the topic:
+1 : Kirill Merkushev
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: ```# bin/pulsar-admin topics
stats tasks-engine
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 301946,
"msgInCounter" : 1504,
"bytesOutCounter" : 301946,
"msgOutCounter" : 1504,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 301946,
"backlogSize" : 239146,
"publishers" : [ {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"producerName" : "standalone-0-11",
"connectedSince" : "2020-06-25T15:32:32.654Z",
"clientVersion" : "2.5.0",
"address" : "/10.0.2.2:54818"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 2,
"metadata" : {
"instance_id" : "0",
"application" : "pulsar-function",
"id" : "public/default/tasks-JobEnginePulsarFunction"
},
"producerName" : "standalone-0-6",
"connectedSince" : "2020-06-25T15:29:26.603Z",
"clientVersion" : "2.6.0",
"address" : "/127.0.0.1:56128"
} ],
"subscriptions" : {
"public/default/tasks-JobEnginePulsarFunction" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 301946,
"msgOutCounter" : 1504,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 1190,
"msgBacklogNoDelayed" : 1190,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 1190,
"type" : "Shared",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1593099153207,
"lastConsumedTimestamp" : 1593099153689,
"lastAckedTimestamp" : 1593099037935,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 301946,
"msgOutCounter" : 1504,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0.0,
"consumerName" : "75d3c",
"availablePermits" : 496,
"unackedMessages" : 1190,
"avgMessagesPerEntry" : 6,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1593099037935,
"lastConsumedTimestamp" : 1593099153689,
"metadata" : {
"instance_id" : "0",
"application" : "pulsar-function",
"id" : "public/default/tasks-JobEnginePulsarFunction"
},
"connectedSince" : "2020-06-25T15:21:10.978Z",
"clientVersion" : "2.6.0",
"address" : "/127.0.0.1:56128"
} ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled"
}```
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Restarting the function or
stopping/starting it does nothing. After being restarted, the function does not
process messages. I did not mention it bug I don’t see anything special in the
container output. There is not error in the function log file and the
pulsar.log and pulsar-standalone.log file are empty.
While this happens, I cannot even retrieve state using pulsar CLI:
```# bin/pulsar-admin functions querystate --name
tasks-MonitoringGlobalPulsarFunction --key monitoringGlobal.state
null
Reason: java.util.concurrent.TimeoutException```
The only way to make it start processing message again is to stop the container
and start it again. Doing this leads to other issues, although maybe it’s not
worth mentioning them here otherwise we might have too many things to look at
at the same time.
I found some github issues really close to this:
<https://github.com/apache/pulsar/issues/6813> ,
<https://github.com/apache/pulsar/issues/6427> and
<https://github.com/apache/pulsar/issues/7036> .
Unfortunately, they’re not getting much attention. Is there anything I can do
or maybe provide additional details to help push them forward?
Any help would be much appreciated. Thanks!
----
2020-06-26 11:14:15 UTC - Jonas Kint: @Jonas Kint has joined the channel
----
2020-06-26 12:03:57 UTC - Kirill Merkushev: tried on 2.5.0 - no such param, and
no mention in the api
<http://pulsar.apache.org/staging/admin-rest-api/#operation/deleteSubscription>
- also it stands that should be no active consumers
----
2020-06-26 13:56:43 UTC - rwaweber: I’m getting it directly from prometheus,
but I can retrieve it directly from the brokers if that would help. (not sure
if it’s a red herring, but only one of the brokers seem to report _that_ metric
at a given time — we also only have three brokers ATM)
Metric with labels:
```pulsar_storage_size{cluster="pulsar-cluster-1",instance="brk-01q.local:8080",job="scrapes",namespace="public/dev",topic="<persistent://public/dev/beats.replica>"}
2786956094```
----
2020-06-26 14:07:00 UTC - jinggang: thanks all. I have forked the code and
create a generic authentication for my own
----
2020-06-26 15:00:17 UTC - Matteo Merli: The "force delete" for subscriptions
was added in 2.6
----
2020-06-26 15:01:37 UTC - Matteo Merli: Keep in mind that, if the consumer is
connected, it will recreate the subscription immediately on reconnection.
If you just want to get rid of the backlog, you can skip it all
----
2020-06-26 16:11:35 UTC - Kirill Merkushev: Thanks, I just got a stale
connection and want to get rid of it
----
2020-06-26 17:06:05 UTC - Sijie Guo: Do you have the sequence to reproduce this
issue?
Btw, <#C015BU8JWUW|aop> is the good place to ask questions related to
<#C015BU8JWUW|aop>
----
2020-06-26 19:18:55 UTC - Jeff Schneller: What is the preferred
authentication/authorization mechanism - TLS or JWT?
----
2020-06-26 19:31:20 UTC - Matteo Merli: JWT is generally easier to setup, given
that the tooling around OpenSSL can be a bit cryptic.
Also, it can easier to share token (as strings) compared to certificates.
At the end, it really boils down to what auth schema you're already using for
other services.
----
2020-06-26 20:07:12 UTC - Frank Kelly: Another Auth*n question - this time for
a Custom Auth*n Plugin. I have `authorizationEnabled=true` defined on my proxy
(so I'm expecting Authorization to occur on the proxy and not on the broker)
```root@pulsar-proxy-6f798754db-r9gbw:/pulsar/conf# grep -i authorization
proxy.conf
### ---Authorization --- ###
# Whether authorization is enforced by the Pulsar proxy
authorizationEnabled=true
# Authorization provider as a fully qualified class name
authorizationProvider=com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
# Whether client authorization credentials are forwared to the broker for
re-authorization.
forwardAuthorizationCredentials=false```
and I can see that the Plugin has been loaded successfully and initialized
```[16:02:04] fkelly@Franks-Cogito-Work-Computer:[~/platform2-test]:
(feature/sdlc-31257-minikube-integration) klf pulsar-proxy-6f798754db-r9gbw |
grep -i authorization
[conf/proxy.conf] Applying config authorizationEnabled = true
[conf/proxy.conf] Applying config authorizationProvider =
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
19:55:31.069 [main] INFO
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
- ==> Initialize()
19:55:31.074 [main] INFO
org.apache.pulsar.broker.authorization.AuthorizationService -
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
has been loaded.```
From the Proxy logs I see my token was authenticated successfully but I see no
traces of the AuthorizationProvider being accessed/executed. Any thoughts?
----
2020-06-26 21:58:37 UTC - Jared Marolf: @Jared Marolf has joined the channel
----
2020-06-26 22:06:41 UTC - Jared Marolf: Has anyone ever used pulsar with
openstack? I am trying to deploy a multi broker setup but the issue is the
openstack instances can't communicate via floating ip, only private ip. I am
trying to figure out if I need to utilize the advertisedListeners and
internalListenerName for this to work and what the appropriate setup would be.
Would I need to advertise both the floating ip and private ip on the service
and http ports or some other combination of that? The brokers need to be able
to be connected to by the floating ip from external sources. Any input would be
appreciated.
----