2019-05-03 10:46:52 UTC - Alexandre DUVAL: ```10:44:33.704
[pulsar-external-listener-17-1] INFO
org.apache.pulsar.functions.runtime.RuntimeSpawner -
clevercloud/functions/RouteApplicationsAddonsLogs-0 RuntimeSpawner starting
function
10:44:33.704 [pulsar-external-listener-17-1] INFO
org.apache.pulsar.functions.runtime.ProcessRuntime - Creating function log
directory
/pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs
10:44:33.704 [pulsar-external-listener-17-1] INFO
org.apache.pulsar.functions.runtime.ProcessRuntime - Created or found function
log directory
/pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs
10:44:33.704 [pulsar-external-listener-17-1] INFO
org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting
the process with args java -cp
/pulsar/instances/java-instance.jar:/pulsar/instances/deps/*
-Dpulsar.functions.java.instance.jar=/pulsar/instances/java-instance.jar
-Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps
-Dlog4j.configurationFile=java_instance_log4j2.yml
-Dpulsar.function.log.dir=/pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs
-Dpulsar.function.log.file=RouteApplicationsAddonsLogs-0 -Xmx1073741824
org.apache.pulsar.functions.runtime.JavaInstanceMain --jar
/tmp/pulsar_functions/clevercloud/functions/RouteApplicationsAddonsLogs/0/pulsar-functions-0.1.0-SNAPSHOT.jar
--instance_id 0 --function_id 70d6d305-7f08-41fe-9f6b-f1ce1a09bf76
--function_version c0baae5b-4631-44cb-83bb-7c0da28a626a --function_details
'{"tenant":"clevercloud","namespace":"functions","name":"RouteApplicationsAddonsLogs","className":"com.clevercloud.pulsar.function.RouteApplicationsAddonsLogs","logTopic":"log_topic","autoAck":true,"parallelism":1,"source":{"typeClassName":"java.lang.String","inputSpecs":{"clevercloud/logs/full":{}},"cleanupSubscription":true},"sink":{"typeClassName":"java.lang.Void"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"}}'
--pulsar_serviceurl
<pulsar://c1-pulsar-clevercloud-customers.services.clever-cloud.com:2001>
--client_auth_plugin org.apache.pulsar.client.impl.auth.AuthenticationToken
--client_auth_params token:TOKEN --use_tls false --tls_allow_insecure false
--hostname_verification_enabled false --tls_trust_cert_path
/etc/ssl/private/c1-pulsar-clevercloud-customers_services_clever-cloud_com/c1-pulsar-clevercloud-customers_services_clever-cloud_com.crt
--max_buffered_tuples 1024 --port 35331 --metrics_port 35175
--expected_healthcheck_interval 30 --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider
--cluster_name c1
10:44:33.706 [pulsar-external-listener-17-1] INFO
org.apache.pulsar.functions.runtime.ProcessRuntime - Started process
successfully
Exception in thread "main"
org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.ParameterException:
Unknown option: --metrics_port
at
org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parseValues(JCommander.java:742)
at
org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parse(JCommander.java:282)
at
org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parse(JCommander.java:265)
at
org.apache.pulsar.functions.runtime.JavaInstanceMain.main(JavaInstanceMain.java:191)
```
----
2019-05-03 10:47:01 UTC - Alexandre DUVAL: `ParameterException: Unknown option:
--metrics_port`
----
2019-05-03 10:47:53 UTC - Alexandre DUVAL: I'm running all using v2.4.0 from
30th of April except BK and ZK (yet), don't understand why I've got this issue.
----
2019-05-03 10:48:39 UTC - Alexandre DUVAL: The function instance arg
"--metrics_port" is injected.
----
2019-05-03 10:50:52 UTC - Alexandre DUVAL: From RuntimeUtils.java
----
2019-05-03 10:53:22 UTC - Alexandre DUVAL: But JavaInstanceMain reject it
:confused:.
----
2019-05-03 11:19:50 UTC - Alexandre DUVAL: --metrics_port is
javainstancemain.class parameter in both versions and javainstancemain.java:191
has error about metrics_port
```server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
.build()
.start();```
in v2.3.1 or v2.4.0 :confused:
I'm a bit lost now.
----
2019-05-03 12:29:37 UTC - Laurent Chriqui: I made a fresh install of python3
and it cleared the problem.
----
2019-05-03 12:51:14 UTC - Sanjeev Kulkarni: @Alexandre DUVAL you are using
process container factory right m?
----
2019-05-03 12:51:36 UTC - Alexandre DUVAL: I'm only using functions create from
pulsar-admin.
----
2019-05-03 12:51:41 UTC - Sanjeev Kulkarni: Are you sure you have the right
version of all the classes
----
2019-05-03 12:52:22 UTC - Alexandre DUVAL: I just downloaded the last snapshot
from master.
----
2019-05-03 12:52:32 UTC - Alexandre DUVAL:
pulsar-server-distribution-2.4.0-20190502.134020-76-bin.tar.gz
----
2019-05-03 12:52:40 UTC - Sanjeev Kulkarni: create itself should say created
successfully but this error looks like when actually trying the start the
process
----
2019-05-03 12:52:43 UTC - Alexandre DUVAL: I'm only running this functions
worker and this pulsar-admin.
----
2019-05-03 12:52:56 UTC - Alexandre DUVAL: No, I got the created sucessfully
----
2019-05-03 12:53:01 UTC - Alexandre DUVAL: from functions create
----
2019-05-03 12:53:13 UTC - Alexandre DUVAL: it's the running step from the
functions_worker which is outputting this error
----
2019-05-03 12:56:30 UTC - Sanjeev Kulkarni: That’s very wierd. Can you check
for javainstancemain class and see what version it is
----
2019-05-03 13:56:57 UTC - Chris DiGiovanni: I'm looking over the Pulsar TLS
configuration for Transport and Authentication. In my situation we have an
Internal CA which is fine for TLS Transport but the internal group at my
organization responsible for the CA will not generate the type of certs I need
for authentication. So I have two CA cert files I need Pulsar to Trust; my
internal CA, and the one I create for TLS Authentication. Looking through the
source, it seems that Pulsar only expects one path via tlsTrustCertsFilePath.
Though what I cannot figure out is if Pulsar will take multiple certs in one
PEM for tlsTrustCertsFilePath for this situation? If it does not, are my
options to just give the JVM the argument `-Djavax.net.ssl.trustStore` and pass
my own trustStore with my CA certs in this? Any guidance would be appreciated.
----
2019-05-03 14:04:17 UTC - Chris Bartholomew: @Chris DiGiovanni I set that to
the default OS cert bundle PEM (since I use Let's Encrypt certs) which
contains 150 certs: ```tlsTrustCertsFilePath=/etc/ssl/certs/ca-certificates.crt
``` Works for me, no problem.
----
2019-05-03 14:20:16 UTC - Chris DiGiovanni: Thanks @Chris Bartholomew
----
2019-05-03 15:28:47 UTC - Thor Sigurjonsson: When I am setting up
`functions_worker.yml` to enable functions alongside the configs in
`broker.conf`there are fields that look the same between those files -- such as
`authenticationEnabled` in the section under the comment "security settings for
worker service". Do these need to mirror the settings in `broker.conf` or are
they to be left blank if `functionsWorkerEnabled=true` (in `broker.conf) --
which enables the function worker to run in the broker itself?
----
2019-05-03 15:29:19 UTC - Thor Sigurjonsson: I;m specifically working to get
functions worker to run with token auth enabled in the cluster.
----
2019-05-03 16:03:14 UTC - Alexandre DUVAL: How to create pulsaradmin client in
pulsar function? ```Caused by: java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:542) ~[?:1.8.0_192]
at java.lang.Integer.parseInt(Integer.java:615) ~[?:1.8.0_192]
at
org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigHelper$Config.getInt(AsyncHttpClientConfigHelper.java:85)
~[java-instance.jar:2.4.0-SNAPSHOT]
at
org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultMaxRedirects(AsyncHttpClientConfigDefaults.java:134)
~[java-instance.jar:2.4.0-SNAPSHOT]
at
org.apache.pulsar.shade.org.asynchttpclient.DefaultAsyncHttpClientConfig$Builder.<init>(DefaultAsyncHttpClientConfig.java:670)
~[java-instance.jar:2.4.0-SNAPSHOT]
at
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.<init>(AsyncHttpConnector.java:74)
~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
at
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider.getConnector(AsyncHttpConnectorProvider.java:43)
~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
at
org.apache.pulsar.client.admin.PulsarAdmin.<init>(PulsarAdmin.java:172)
~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
at
org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl.build(PulsarAdminBuilderImpl.java:42)
~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
at
com.yo.pulsar.function.RouteApplicationsAddonsLogs.<init>(RouteApplicationsAddonsLogs.java:49)
~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:1.8.0_192]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_192]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_192]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_192]
at
org.apache.pulsar.functions.utils.Reflections.createInstance(Reflections.java:118)
~[java-instance.jar:2.4.0-SNAPSHOT]
```
----
2019-05-03 16:03:24 UTC - Alexandre DUVAL: maxRedirect has no default value?
How to fill it?
----
2019-05-03 16:08:39 UTC - Brian Doran: @Brian Doran has joined the channel
----
2019-05-03 16:13:07 UTC - Sanjeev Kulkarni: @Alexandre DUVAL how are you
instantiating it
----
2019-05-03 16:13:56 UTC - Alexandre DUVAL: ```public class
RouteApplicationsAddonsLogs implements Function<String, Void> {
String UNCLASSIFIED_TOPIC =
"<persistent://yo/logs/applications-addons-unclassified>";
PulsarAdmin pulsarAdmin;
public RouteApplicationsAddonsLogs() {
try {
pulsarAdmin = PulsarAdmin.builder()
.allowTlsInsecureConnection(false)
.enableTlsHostnameVerification(false)
//.tlsTrustCertsFilePath(tlsTrustCertsFilePath)
.serviceHttpUrl("<https://c1-pulsar-yo-customers.services.yo.com:2000>")
.authentication("org.apache.pulsar.client.impl.auth.AuthenticationToken",
"token:TOKEN")
.build();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
@Override
public Void process(String input, Context context) {
...```
----
2019-05-03 16:13:59 UTC - Alexandre DUVAL: @Sanjeev Kulkarni
----
2019-05-03 16:18:32 UTC - Sanjeev Kulkarni: i have never done this before. Let
me work this at my end and I will get back to you
----
2019-05-03 16:19:42 UTC - Brian Doran: Hi all, just started using Pulsar as an
alternative to my Kafka implementation. So for the schema registry with Kafka
we currently use the Confluent Schema registry.
So we provide a `KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG` to the
producer when creating our KafkaProducer.
We generate out Schema built dynamically as they can change according to fields
in the record for any particular topic. I have a org.apache.avro.Schema and was
wondering what's the best way to plug this into the
`pulsarClient.newProducer(org.apache.pulsar.client.api.Schema)` method. Can
this be done easily, a util method for converting avro.Schema to pulsar.Schema
perhaps?
----
2019-05-03 16:19:46 UTC - David Kjerrumgaard: @Alexandre DUVAL What do you need
the PulsarAdmin for?
----
2019-05-03 16:19:49 UTC - Brian Doran: Thanks in advance for any pointers
----
2019-05-03 16:24:40 UTC - David Kjerrumgaard: @Brian Doran Pulsar supports Avro
schemas. <http://pulsar.apache.org/docs/en/concepts-schema-registry/>
----
2019-05-03 16:25:01 UTC - David Kjerrumgaard: So if you already have an
existing schema you can just upload it and start using it.
----
2019-05-03 16:25:10 UTC - David Kjerrumgaard:
<http://pulsar.apache.org/docs/en/admin-api-schemas/>
----
2019-05-03 16:25:21 UTC - David Kjerrumgaard: HTH
----
2019-05-03 16:28:34 UTC - Brian Doran: Hi @David Kjerrumgaard thanks for your
quick response. Sure I understand that it supports Avro which is great, however
we build up these schemas dynamically and they can change at runtime depending
on various factors. Previously we handle this in our code with Confluent SR as
our Generic Record will have a different schema associated with it.
----
2019-05-03 16:28:57 UTC - Brian Doran: Was hoping not to have to pre-upload the
schemas.
----
2019-05-03 16:30:23 UTC - Alexandre DUVAL: create tenant and namespaces
depending on message properties
----
2019-05-03 16:30:28 UTC - Alexandre DUVAL: if they not exist
----
2019-05-03 16:30:29 UTC - Brian Doran: I was just wondering if this can be done
with Pulsar too ... I was just going to recreate the producers with the 'new'
schema.
----
2019-05-03 16:30:43 UTC - Alexandre DUVAL: thanks, let me know your progress
:wink:
----
2019-05-03 16:30:51 UTC - David Kjerrumgaard: @Brian Doran Ok, thank you for
the additional background. Your goal is to be able to dynamically create and
update schemas inside the Pulsar schema registry?
----
2019-05-03 16:31:58 UTC - Brian Doran: yes..
----
2019-05-03 16:32:14 UTC - Alexandre DUVAL: the goal is to create tenant and
namespaces dynamically from message properties informations if they not exist,
maybe you have other way to do it?
----
2019-05-03 16:33:26 UTC - Sijie Guo: We have GenericSchemaBuilder and
GenericRecordBuilder support in pulsar 2.4.0.
----
2019-05-03 16:33:35 UTC - Sanjeev Kulkarni: I general I would try to limit the
use of pulsaradmin. It is kind of heavy for this purpose, a better way would be
to use a much simpler rest api library to make the approrpriate rest call
----
2019-05-03 16:33:46 UTC - David Kjerrumgaard: That seems to complicate your
function unnecessarily. Can't you impose a requirement that the tenant and
namespace already exist.
----
2019-05-03 16:34:01 UTC - Brian Doran: Generally what we do when a new Schema
is generated is just start creating GenericRecords with this new schema.
However, with Pulsar I was going to create a new Producer for that topic with
the new Schema after closing the old producers.
----
2019-05-03 16:34:48 UTC - Sijie Guo: currently Schema is not well documented in
Pulsar website. I have been working on adding the documentation for Pulsar. I
will have a PR out in a few days.
+1 : David Kjerrumgaard
----
2019-05-03 16:34:57 UTC - Sanjeev Kulkarni: or atleast use a alternate
mechanism for creating the tenant/namespace other than pulsaradmin. Pulsaradmin
is kind of heavy weight for this purpose
----
2019-05-03 16:35:38 UTC - David Kjerrumgaard: The the pulsar admin API I
referenced above is also supported in the Java library, so you could in theory
register new schemas as you "discover" and create them
----
2019-05-03 16:37:53 UTC - Sijie Guo: @Brian Doran :
The schema versioning, GenericSchemaBuilder and GenericRecord and
GenericRecordBuilder is going to be released in 2.4.0.
Documentation around that part will be out in a few days.
+1 : David Kjerrumgaard
----
2019-05-03 16:38:43 UTC - Alexandre DUVAL: Can't have this requirement :/.
----
2019-05-03 16:39:06 UTC - Alexandre DUVAL: So use the rest api and http client?
To create tenant then namespace ?
----
2019-05-03 16:39:06 UTC - David Kjerrumgaard: Thanks @Sijie Guo I was trying to
find a link to the docs to share with Brian :smiley:
----
2019-05-03 16:40:17 UTC - Alexandre DUVAL: In Pulsar function, context.publish
can't create namespace and tenant, right? Only topic if it not existe?
----
2019-05-03 16:40:31 UTC - Sanjeev Kulkarni: that is correct
----
2019-05-03 16:40:35 UTC - Alexandre DUVAL: Okay
----
2019-05-03 16:41:00 UTC - Sijie Guo: Schema is not well documented. Folks from
zhaopin have been contributing a lot of schema features recently. I have been
working with them on the documentation part. so stay tuned.
----
2019-05-03 16:41:07 UTC - Alexandre DUVAL: Okay then i will make it as rest api
calls.
----
2019-05-03 16:41:23 UTC - Sanjeev Kulkarni: that would be better. let us know
how it goes
----
2019-05-03 16:43:48 UTC - Brian Doran: OK great .. thanks for the help lads, I
will wait for 2.4.0 when is that due to drop?
----
2019-05-03 16:45:33 UTC - David Kjerrumgaard: @Brian Doran 2.4.0 is the latest
on the master branch, so you can clone it and build it locally if you like
----
2019-05-03 16:46:25 UTC - Brian Doran: @David Kjerrumgaard I've just done it.
Thanks!
----
2019-05-03 16:46:51 UTC - Sijie Guo: @Brian Doran there were discussions on
wrapping up 2.4.0. so I would expect it will come out soon.
+1 : Brian Doran
----
2019-05-03 16:51:26 UTC - Brian Doran: Just a note, I don;t see
GenericSchemaBuilder on master yet. Am I missing something?
----
2019-05-03 16:53:28 UTC - Sijie Guo: @Brian Doran
<https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaBuilder.java>
+1 : Brian Doran
----
2019-05-03 17:14:28 UTC - Brian Doran: @Sijie Guo One last question .. if I
already have a avro.Schema is there an easy way to convert to pulsar.Schema?
----
2019-05-03 17:51:13 UTC - Matteo Merli: @Brian Doran You can use:
```
Schema.AVRO(
SchemaDefinition.builder()
.withJsonDef(avroJsonSchema)
.build()
);
```
----
2019-05-03 20:34:17 UTC - Patrick Lange: Does anyone know how to get the python
3.7 pulsar-client to work with python from anaconda on MacOS? The 2.0.1 client
under python 3.6 worked but `pulsar-client==2.3.1` segfaults on `import
_pulsar` with python3.7 (from conda). When I use homebrew python3.7 it works.
Homebrew python
> Python 3.7.3 (default, Mar 27 2019, 09:23:15)
> [Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Anaconda python
>Python 3.7.3 (default, Mar 27 2019, 16:54:48)
>[Clang 4.0.1 (tags/RELEASE_401/final)] :: Anaconda, Inc. on darwin
----
2019-05-03 21:27:00 UTC - Jason Gu: @Jason Gu has joined the channel
----
2019-05-03 23:12:57 UTC - Poule: @Patrick Lange my python segfaults too
----
2019-05-03 23:13:19 UTC - Patrick Lange: it does work for me with the python3
installed from homebrew
----
2019-05-03 23:13:42 UTC - Vincent Ngan: On my way.
----
2019-05-03 23:33:41 UTC - Poule: @Patrick Lange thanks buddy it solved my
problem
----
2019-05-04 00:42:57 UTC - Poule: how can I trigger a function from a python
script?
----
2019-05-04 01:11:55 UTC - Jonathan Pearl: @Jonathan Pearl has joined the channel
----
2019-05-04 01:19:59 UTC - Jonathan Pearl: Hey all, newby question coming from a
Kafka background. When using partitioned topics, is there any notification on
the consuming side which partitions you'll be receiving messages from?
I guess I'm looking for an analog to Kafka's rebalance callback to know when it
is safe to offload a cache or when I should start fetching.
----
2019-05-04 01:21:00 UTC - Jonathan Pearl: Similarly, is there a way to control
which consumers get which partitions? In Kafka you could set an "assigner" that
the group leader would determine the partition spread. Out-of-the box options
were round-robin and range assignment.
----
2019-05-04 01:24:07 UTC - Matteo Merli: > When using partitioned topics, is
there any notification on the consuming side which partitions you’ll be
receiving messages from?
Yes, you can attach a consumer event listener for that.
<https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#consumerEventListener-org.apache.pulsar.client.api.ConsumerEventListener->
----
2019-05-04 01:25:09 UTC - Jonathan Pearl: Ah, would also apply to a shared
subscription? The description makes me think it's only for failover
----
2019-05-04 01:25:36 UTC - Matteo Merli: yes, only for failover. in shared mode
each consumer is getting messages from all the partitions
----
2019-05-04 01:25:56 UTC - Matteo Merli: > Similarly, is there a way to
control which consumers get which partitions? In Kafka you could set an
“assigner” that the group leader would determine the partition spread.
Out-of-the box options were round-robin and range assignment.
If you want to manually assign the partitions to consumers, you can directly
subscribe to the individual partitions, instead of the partitioned topic. eg:
`TOPIC-partition-5` and so on
----
2019-05-04 01:29:26 UTC - Jonathan Pearl: So if I wanted something like a
dynamically scaled subscription with exclusive partitions per member to get to
something similar to a Kafka consumer group, that would need to be built on top
of Pulsar?
----
2019-05-04 01:30:41 UTC - Matteo Merli: > dynamically scaled subscription
with exclusive partitions per member
What do you refer exactly with this?
----
2019-05-04 01:32:03 UTC - Matteo Merli: to rephrase, what kind of assignment
behavior (consumer -> partitions) are you looking at?
----
2019-05-04 01:35:46 UTC - Jonathan Pearl: Basically a partition would ever only
get served to one consumer at a time
----
2019-05-04 01:36:26 UTC - Jonathan Pearl: So if a consumer joined or left a
subscription, its partitions would be coordinated with the others who would
give up their own partitions (or would give them partitions when they left)
----
2019-05-04 01:37:53 UTC - Jonathan Pearl: Poor drawing ahead
----
2019-05-04 01:40:09 UTC - Jonathan Pearl: It's my main appeal to Kafka as it
allows things to cache nicely if you are aware of your own partition strategy,
as well as some notion of exclusivity knowing that other consumers in your
group/subscription wouldn't be operating on messages that had the same partition
----
2019-05-04 01:42:29 UTC - Matteo Merli: So, in Pulsar with a failover
subscription over a partitioned topic, you’ll get that semantic: “One consumer
active on each partition”. The only thing is that the assignment is always
decided by broker.
----
2019-05-04 01:45:08 UTC - Jonathan Pearl: My assumption with failover is that
all of the partitions would go to 1 consumer, and the other consumers in the
subscription would get nothing.
From
<http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/SubscriptionType.html#Failover>:
`Multiple consumer will be able to use the same subscription name but only 1
consumer will receive the messages.`
----
2019-05-04 01:46:30 UTC - Matteo Merli: Yes, that’s the semantic within one
partition. With a partitioned topic, the brokers will choose a different
“active” consumer for each partition (in round-robin fashion)
----
2019-05-04 01:47:21 UTC - Jonathan Pearl: Oh! Gotcha, my bad. Thanks!
Unfortunate that the assignment is fixed.
----
2019-05-04 01:47:57 UTC - Jonathan Pearl: assignment strategy*
----
2019-05-04 01:48:14 UTC - Matteo Merli: Going back to the question, how you’d
want to customize that?
----
2019-05-04 01:51:37 UTC - Jonathan Pearl: Ideally, I'd like to minimize the
number of partitions changing hands whenever a subscription membership changes.
With round-robin, you could see each consumer getting affected. This could get
expensive depending on the cleverness going on as far as caching on partition
de/activation.
----
2019-05-04 01:52:40 UTC - Jonathan Pearl:
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html>
tries to keep partitions to their previous owners, but if it was up to the
client, it might have more context on which partitions are more "expensive"
than others
----
2019-05-04 01:54:31 UTC - Matteo Merli: So, if that would help, the round-robin
assigment in Pulsar is deterministic. It’s based on a simple schema that
doesn’t use any coordinations.
eg: each consumer has a “consumerName” which can be set and by default is a
random string. Broker will sort all available consumer by their name and will
pick the 1st consumer for partition-0, the 2nd for partition-1 and so on.
----
2019-05-04 01:55:45 UTC - Matteo Merli: for the re-assignment there is
currently a “grace-time” of 1sec by default (I think) within which the
partition is not re-assigned.
----
2019-05-04 01:56:19 UTC - Matteo Merli: so, in case of a quick reconnect, the
partition will not be handed over to another consumer
----
2019-05-04 01:57:19 UTC - Matteo Merli: In any case, it might be interesting to
have that grace-time be more configurable (right now is per-broker) so that
each consumer could specify its own value
----
2019-05-04 01:57:34 UTC - Jonathan Pearl: Ah, that's cool. Would it be possible
for a consumer to assume another consumer's name and "inherit" its assignment
if it started up fast enough?
----
2019-05-04 01:57:57 UTC - Jonathan Pearl: Or does it need to be the same client.
----
2019-05-04 01:57:58 UTC - Matteo Merli: Yes, you can specify the name in
`ConsumerBuilder`
----
2019-05-04 01:58:09 UTC - Matteo Merli: or it will get a random one
----
2019-05-04 02:00:43 UTC - Jonathan Pearl: I think that might help a bit, but I
can still see round-robin still being an issue in legitimate up-scaling and
down-scaling scenarios. But I suppose implanting a new assignment strategy in
Pulsar wouldn't be impossible! :stuck_out_tongue:
----
2019-05-04 02:01:00 UTC - Matteo Merli: definitely not impossible :smile:
----
2019-05-04 02:01:02 UTC - Jonathan Pearl: Thanks for all the great tips!
+1 : Matteo Merli
----
2019-05-04 02:05:44 UTC - Jonathan Pearl: Actually, while I have you here, how
heavy are reassignments? As in, how long do they take to resolve, and are
messages stopped during this time for all partitions?
----
2019-05-04 02:08:39 UTC - Matteo Merli: there’s only the artificial delay of
1sec (configurable) to avoid flickering in reconnections. the delivery is only
stopped in 1 partition
----
2019-05-04 02:09:09 UTC - Matteo Merli: since each broker decides on its own,
there’s not much overhead
----
2019-05-04 02:09:34 UTC - Matteo Merli: (decides on its own using the same
deterministic logic)
----
2019-05-04 02:10:07 UTC - Jonathan Pearl: Fantastic! As customizable as it was
in Kafka, one of my biggest pain points is how much a showstopper sync point
rebalancing is across a consumer group. So that's great news.
----
2019-05-04 02:12:19 UTC - Matteo Merli: but yes, definitely we could add some
better way to guarantee the stickyness
----
2019-05-04 02:37:33 UTC - Sanjeev Kulkarni: What do you mean trigger a
function?
----