2020-03-12 09:14:14 UTC - Sijie Guo: :+1: ---- 2020-03-12 09:28:32 UTC - Vladimir Shchur: Hi! Has anyone used input-topics pattern for functions? It worked for me two days ago, but something happened (cluster reinstall doesn't help) and now Java function (for example ExclamationFunction) can't subscribe to the topic with `subscribe error: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check` Single-topic-input functions work. With python it looks like input pattern doesn't work at all. Function is running inside standalone worker as thread. ---- 2020-03-12 10:07:44 UTC - Vladimir Shchur: Finally was able to make it work by specifying STRING schema by hand, but this is very inconvenient to do with dynamic topic, since I specifically use topic pattern to handle topics flexibility. ---- 2020-03-12 11:24:14 UTC - Gilles Barbier: Hi, is there any way for a function deployed with n parallel instances to ensure that messages sharing a same attribute do not run in parallel? For exemple, something like providing a key whose hash will target a function instance? What I want to avoid is a race condition between 2 parallel functions that update function’s state with the same key by adding a value in an array - I’m afraid that in case of such race condition, I would miss a value in this array. Hoping I’m clear! ---- 2020-03-12 11:55:43 UTC - Vladimir Shchur: Looks like you are requesting Key_Shared subscription mode for functions :slightly_smiling_face: I can recommend creating a github issue for that ---- 2020-03-12 11:57:01 UTC - Gilles Barbier: I’ll do - thx ---- 2020-03-12 13:50:50 UTC - Gilles Barbier: <https://github.com/apache/pulsar/issues/6527> +1 : Vladimir Shchur, Konstantinos Papalias ---- 2020-03-12 15:29:05 UTC - Ming: If your sink connector has a subscription already and crashed, Pulsar will redeliver messages after your connector recovers from the crash. ---- 2020-03-12 16:37:31 UTC - David Kjerrumgaard: @Aravindhan While it is possible to modify the Kafka Source connector to do that, I wouldn't be a good idea since that class would now have multiple responsibilities which violates OO best practices. If you are concerned about the extra I/O cost associated with an additional intermediate topic, you could use a non-persistent topic for that purpose, which would avoid the disk I/O altogether ---- 2020-03-12 16:43:07 UTC - David Kjerrumgaard: Yea, I was going to note that you need to have a schema associated with a topic now before a consumer can subscribe to it. ---- 2020-03-12 17:03:41 UTC - Sijie Guo: failover subscription already can achieve this for you, no? ---- 2020-03-12 17:04:05 UTC - Sijie Guo: `key_shared` subscription only to help you scaling the number of instances beyond the number of partitions. ---- 2020-03-12 17:05:49 UTC - Sijie Guo: It sounds like you are creating a topic outside the function without the right schema type. ---- 2020-03-12 17:10:19 UTC - Gilles Barbier: @Sijie Guo sorry, I’m trying but I do not understand. How failover can help to avoid having function’s state update race condition between multiple function instances? ---- 2020-03-12 17:12:14 UTC - Gilles Barbier: Of course, If I have a unique function instance, I do not have any issue, but If I have multiple function’s instances, I should have it. Do I miss something? ---- 2020-03-12 17:12:26 UTC - Sijie Guo: in failover mode, each partition is assigned to only one consumer to consume. +1 : Konstantinos Papalias ---- 2020-03-12 17:16:47 UTC - Vil: Does key_shared subscription mode guaranteeing ordering of message consumption/processing (per key)? ---- 2020-03-12 17:18:41 UTC - Sijie Guo: in Pulsar Functions, each function instance is essentially a “consumer”. so you can use subscription mode to control how messages are dispatched to different consumers (function instances).
• shared mode: you don’t have any control on how messages are dispatched to a function instance. • failover mode: a partition is assigned to one consumer to consume • key_shared mode: a key_range is assigned to one consumer to consum both “failover” and “key_shared” guarantees messages of a certain “key” is dispatched to one consumer. however the dispatch can be changed when you scaling up the parallelism. If you want a key_range is tied with one given function instance, we need to use sticky_range readers and make sure the state is associated with the key_range you are reading. When you scale up the parallelism, there are considerations regarding whether to split to the state or not to ensure the messages of a given key_range still associate with the state after scaling up. So the question is do you need to ensure a key_range is tied to a given function instance? ---- 2020-03-12 17:19:27 UTC - Sijie Guo: It does ---- 2020-03-12 17:23:00 UTC - Vil: thank you! ---- 2020-03-12 17:28:56 UTC - Gilles Barbier: thx @Sijie Guo to be sure can you point me out more documentation on key_range / sticky_range readers? I only need to be sure that there can not be more than one instance at a time processing a message with the same key (same key than used for the state). So as far as I understand the key_shared mode does the job and is already usable for functions? ---- 2020-03-12 17:33:57 UTC - Sijie Guo: sticky key_range reader is not released yet. it is in master and will be released in 2.6.0. <https://github.com/apache/pulsar/pull/5928> if the requirement is just - at a time one message with the same key is processed by a function instance, either `failover` or `key_shared` subscription works for you. ---- 2020-03-12 17:34:29 UTC - Gilles Barbier: great, thank you so much ---- 2020-03-12 17:48:00 UTC - Gilles Barbier: last question: how do you setup a key_shared subscription mode of a function? there is no associated parameter in the “create function” API as far as I can see. ---- 2020-03-12 17:52:57 UTC - Vladimir Shchur: Right, but I thought that pulsar functions can work with any topics, not only the ones with schema specified ---- 2020-03-12 18:07:05 UTC - David Kjerrumgaard: All consumers, including functions, perform a <https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-compatibility-check-strategy|schema compatibility check> to ensure that the messages are deserialized properly. This prevents runtime exceptions on the consumer-side due to an inability to properly parse the message content, etc. You can override this compatibility check using the admin CLI to set the policy to `ALWAYS_COMPATIBLE`, e.g. `bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace` ---- 2020-03-12 18:07:47 UTC - David Kjerrumgaard: See <https://pulsar.apache.org/docs/en/schema-manage/#manage-autoupdate-strategy> for details ---- 2020-03-12 18:17:08 UTC - Gilles Barbier: there is no “SubscriptionType” ---- 2020-03-12 18:40:18 UTC - Ian: What's the recommended way to handle producers in the scenario that an application is writing a thousand messages per second, and the messages could be going to any one of a few thousand topics (depending on the message)? ---- 2020-03-12 18:42:08 UTC - Ian: Wondering about overhead on creating a producer, costs to keep a producer while waiting to publish messages with it (memory, network connections?) ---- 2020-03-12 19:01:55 UTC - Adam H: @Adam H has joined the channel ---- 2020-03-12 19:13:09 UTC - Addison Higham: @Ian producers are fairly cheap, they don't each need their own connection, producers multiplex over a single connection. The broker keeps a bit of state per producer but should be quite minimal and they will get spread across the brokers. If you have an unbounded number of topics, you may want to consider some sort of LRU cache in your code that will purge and close producers to at least keep a bound in your app, as producers have some memory and cpu overhead and unbounded topic growth ---- 2020-03-12 19:47:39 UTC - Vladimir Shchur: The issue is that the topic I'm using intentionally doesn't have schema, so I'm just looking for a way to tell function - "this topic doesn't have schema, that's ok" ---- 2020-03-12 19:52:00 UTC - Sijie Guo: In the way, you have to either use ‘byte[]’ as the input time, or just use Serde. By default, function use the input and output type as the schema. If you are submitting a function to process string, the input topic has to be associated with a string schema. ---- 2020-03-12 19:55:02 UTC - David Kjerrumgaard: @Vladimir Shchur That is exactly what this command does, but at the namespace level: `bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility ALWAYS_COMPATIBLE tenant/namespace` ---- 2020-03-12 19:56:35 UTC - Vladimir Shchur: Thank you, will try! ---- 2020-03-12 19:58:36 UTC - Ian: @Addison Higham thank you, that's helpful. Topics would be bounded at a few thousand, but a LRU cache may still be a good idea. ---- 2020-03-12 21:47:42 UTC - Vladimir Shchur: byte[] worked fine for me, thank you ---- 2020-03-12 22:07:13 UTC - Vladimir Shchur: Looks like there is an issue regarding pattern-topic function - it appears that first message that creates the topic is lost, even if topic has retention, because there is no way to set `subscriptionPosition` to earliest for function. There is a workaround for single topic with `create-subscription` command, but this command doesn't support regex for topics. Am I correct that this is a bug? ---- 2020-03-12 22:09:14 UTC - David Kjerrumgaard: Did you create the consuming function first or the message producer first? If you create the function first, then it should receive all messages after it subscribes. ---- 2020-03-12 22:10:53 UTC - Vladimir Shchur: I created the consuming function first, but since it's regex-based function it doesn't create topic, the topic is created from producer side together with first message, so once function recognizes the new topic existence it subscribes, but is unable to grab first message. By the way I've just realized that not only the first, but all messages will be lost until function subscribes to the new topic ---- 2020-03-12 22:14:05 UTC - David Kjerrumgaard: That is the expected behavior based on what you have told me. However, you can create the topic first (manually) and then the function to get the desired behavior. ---- 2020-03-12 22:18:03 UTC - Vladimir Shchur: It's expected but not desired :slightly_smiling_face: A very common case for RabbitMQ-based apps is to create topics dynamically from the application to implement Request-Response pattern, and have some function recognize them. Looks like subscriptionPosition is an important setting to have for functions to handle this. ---- 2020-03-12 22:19:45 UTC - David Kjerrumgaard: Sounds like a feature request. In Pulsar topics have to exist first, then subscriptions (which track where you are in reading from in the topic) get created as they are associated with the topic. ---- 2020-03-12 22:20:58 UTC - David Kjerrumgaard: Functions use the Consumer interface underneath the covers, which supports subscription-based consumption. What you need is Reader based consumption which allows you to specify the starting point of your reads. :smiley: ---- 2020-03-12 22:21:43 UTC - David Kjerrumgaard: So maybe a PIP for "Support Reader interface based Functions" would be the way to go? ---- 2020-03-12 22:25:52 UTC - Vladimir Shchur: I don't think I need a reader, pattern-based function is already a solution, I plan to create input-output request-like topic for each new application node dynamically and it works well except of first message that is lost due to lack of subscriptionPosition=earliest ---- 2020-03-12 23:33:52 UTC - Antti Kaikkonen: You could write your own source connector to do that. <https://pulsar.apache.org/docs/en/io-develop/> If you need state storage then you can access it from the SourceContext object, but keep in mind that it's currenctly in developer preview. I've personally had some stability issues when using state in pulsar standalone. <https://pulsar.apache.org/docs/en/functions-state/> ---- 2020-03-12 23:56:53 UTC - Jin Feng: @Jin Feng has joined the channel ---- 2020-03-13 00:52:02 UTC - Aravindhan: Thanks ---- 2020-03-13 00:52:42 UTC - Aravindhan: Will try that. Thanks. ---- 2020-03-13 03:55:08 UTC - Abhilash Mandaliya: why would the sink connector have any subscription? It is registered against the topic and every time a new message comes to broker on that topic, it automatically gets delivered to sink connector. We don’t need to have a subscription here right? ---- 2020-03-13 06:37:57 UTC - jay yang: @jay yang has joined the channel ---- 2020-03-13 07:56:05 UTC - Gilles Barbier: wdyt @Sijie Guo? ---- 2020-03-13 08:12:43 UTC - Sijie Guo: I see. I think we used only `--processing-guarantees` to determine the subscription type. ---- 2020-03-13 08:13:39 UTC - Sijie Guo: Alternatively you can create a subscription prior to start the function. the subscription name is the <tenant>/<namespace>/<function-name> ---- 2020-03-13 09:01:42 UTC - xue: pulsar producer run in karaf ----
