2020-01-17 09:56:52 UTC - transnano: @transnano has joined the channel ---- 2020-01-17 15:37:55 UTC - Naby: I was looking for a way to configure `pattern_auto_discovery_period` or something similar for my Python function that uses `topics_pattern`. Then I saw that in python client (<https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py>) the value of `pattern_auto_discovery_period` in `subscribe` method is not used. Is that correct? ---- 2020-01-17 15:43:02 UTC - Naby: Also, any suggestion on how to configure a python function to discover new topics that match its topics pattern? @Matteo Merli, this was my original problem which led to the previous one on pulsar python client. Since a function is running by the broker itself, is there a different mechanism to notify the function about the new topics that match the pattern? Or I should configure the function to automatically discover the new topics? If so, how? In my case, when a new topic is generated, it takes some time for its messages to be processed by my python function. And any messages in between get lost. ---- 2020-01-17 16:20:01 UTC - Fernando: When creating a python producer with a schema definition I’m getting ```Exception: Pulsar error: TimeOut``` when trying to create the producer. Any clues? ---- 2020-01-17 16:22:21 UTC - Sijie Guo: Does the topic exist before? ---- 2020-01-17 16:22:45 UTC - Fernando: no ---- 2020-01-17 16:23:28 UTC - Sijie Guo: ok. then you might have to look into the producer log or broker log to see if there are any errors. ---- 2020-01-17 16:23:41 UTC - Fernando: will do ---- 2020-01-17 16:32:45 UTC - Fernando: there are no errors, just a couple of warnings ---- 2020-01-17 16:34:05 UTC - Sijie Guo: are you able to produce with other tools or clients? ---- 2020-01-17 16:34:35 UTC - Fernando: If I set the schema to ByteSchema it works just fine ---- 2020-01-17 16:34:53 UTC - Fernando: in fact a couple of days ago I was testing with the same AvroSchema and it worked ---- 2020-01-17 16:35:03 UTC - Fernando: ---- 2020-01-17 16:35:30 UTC - Fernando: the topic is `topic=<persistent://packhunt/billing/billing-records>` ---- 2020-01-17 16:35:30 UTC - Sijie Guo: okay. I asked if you are producing a message to an existing topic. ---- 2020-01-17 16:36:07 UTC - Sijie Guo: are you trying to publish records with schema to a bytes topic? ---- 2020-01-17 16:36:25 UTC - Fernando: no, I tested on different topics before. This is a new topic ---- 2020-01-17 16:36:58 UTC - Fernando: `pulsar-admin topics list packhunt/billing` doesn’t return anything ---- 2020-01-17 16:37:34 UTC - Fernando: ```pulsar-admin schemas get <persistent://packhunt/billing/billing-records> HTTP 404 Not Found
Reason: HTTP 404 Not Found command terminated with exit code 1``` ---- 2020-01-17 16:37:45 UTC - Sijie Guo: ok ---- 2020-01-17 16:37:51 UTC - Sijie Guo: what version are you running? ---- 2020-01-17 16:38:05 UTC - Fernando: master branch ---- 2020-01-17 16:38:48 UTC - Sijie Guo: okay why not try a release? ---- 2020-01-17 16:39:02 UTC - Fernando: I need a feature that is in 2.5.0 ---- 2020-01-17 16:39:06 UTC - Sijie Guo: master branch is under development and not released yet. ---- 2020-01-17 16:39:14 UTC - Sijie Guo: 2.5.0 is released. ---- 2020-01-17 16:39:35 UTC - Sijie Guo: I mean not completed release yet ---- 2020-01-17 16:39:47 UTC - Sijie Guo: I am still completing the last phase for 2.5.0 release. ---- 2020-01-17 16:40:14 UTC - Fernando: ok ---- 2020-01-17 16:40:19 UTC - Sijie Guo: in that case, you can build from tag v2.5.0 ---- 2020-01-17 16:40:25 UTC - Sijie Guo: rather than from master ---- 2020-01-17 16:40:34 UTC - Fernando: I will thank you ---- 2020-01-17 16:41:14 UTC - Sijie Guo: regardlless, I see “WARN org.apache.pulsar.broker.service.AbstractTopic - [<persistent://packhunt/billing/billing-records>] Error getting policies java.util.concurrent.CompletableFuture cannot be cast to org.apache.pulsar.common.policies.data.Policies and publish throttling will be disabled” in your error log ---- 2020-01-17 16:41:29 UTC - Sijie Guo: that is causing the timeout ---- 2020-01-17 16:41:43 UTC - Sijie Guo: let me check if it is regression in master or not ---- 2020-01-17 16:42:00 UTC - Fernando: thank you! ---- 2020-01-17 16:44:45 UTC - Sijie Guo: what is the gitsha of your cloned master? ---- 2020-01-17 16:45:15 UTC - Fernando: let me check ---- 2020-01-17 16:46:35 UTC - Fernando: oh, I think I forgot to fetch from upstream in my fork so it’s around a month old… ---- 2020-01-17 16:46:37 UTC - Fernando: `8a20d9a221997b5ade0c3a006b3c01be57ec0d4b` ---- 2020-01-17 16:46:52 UTC - Fernando: Date: Mon Dec 9 17:57:01 2019 +0800 ---- 2020-01-17 16:49:48 UTC - Fernando: but so is this happening because the topic doesn’t exist? If i create it as a `ByteSchema` and then create another producer with `AvroSchema` would it work? It’s weird that a couple of days ago I was able to test this no problem but when I create this new namespace it suddenly fails ---- 2020-01-17 16:52:05 UTC - Sijie Guo: I am not sure. the error message looks like a bug to me. ---- 2020-01-17 16:52:57 UTC - Fernando: ok I’m going to build 2.5.0 tag to see what happens ---- 2020-01-17 16:55:28 UTC - Sijie Guo: actually I don’t think that is a problem although the error message is confusing. ---- 2020-01-17 16:55:55 UTC - Fernando: do you have any suggestion to debug it further? ---- 2020-01-17 16:55:57 UTC - Sijie Guo: Can you try v2.5.0 tag and let me know what did you see? ---- 2020-01-17 16:56:18 UTC - Sijie Guo: I would suggest using a release tag ---- 2020-01-17 16:56:33 UTC - Fernando: is this the command to build the docker images? `mvn clean install -Pdocker -DskipTests` ---- 2020-01-17 16:56:38 UTC - Sijie Guo: It is hard to debug things on a gitsha that was fetched almost 2 months ago. ---- 2020-01-17 16:56:51 UTC - Sijie Guo: yes. ---- 2020-01-17 16:57:08 UTC - Fernando: thank you! I’ll try the release and let you know ---- 2020-01-17 17:02:55 UTC - Sijie Guo: thanks ---- 2020-01-17 17:11:24 UTC - shao: @shao has joined the channel ---- 2020-01-17 18:50:09 UTC - Fernando: ok it works with 2.5.0 tag ---- 2020-01-17 19:01:42 UTC - Sagar Shah: Hi, anyone knows if PublishAsync guarantees msg ordering? ---- 2020-01-17 19:02:29 UTC - Matteo Merli: Yes, it does ---- 2020-01-17 19:03:54 UTC - Sagar Shah: Thanks! ---- 2020-01-17 19:13:39 UTC - Sagar Shah: Just to understand: If we send msgs A, B, C - and if B fails and the callback function throw exception, will C msg reach topic? ---- 2020-01-17 19:24:45 UTC - Naby: @Matteo Merli, do I understand this correctly? Or am I missing something here? ---- 2020-01-17 19:26:08 UTC - Matteo Merli: that's correct. it should be passed to the C++ config object in the subscribe method ---- 2020-01-17 19:26:31 UTC - Matteo Merli: C will also receive an exception ---- 2020-01-17 19:26:36 UTC - Naby: I changed the value of `pattern_auto_discovery_period` to 1 sec. and still not getting messages from new topics for a while. ---- 2020-01-17 19:27:10 UTC - Naby: How can I pass the new value to C++ config object? ---- 2020-01-17 19:27:35 UTC - Matteo Merli: like: <https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py#L635> ---- 2020-01-17 19:28:34 UTC - Matteo Merli: In any case the auto discovery period is capped at 1min to avoid clients keep doing requests to broker in short fashion ---- 2020-01-17 19:32:08 UTC - Naby: I understand. But, in some cases, it might be desirable. How can I change the value, since I can’t pass it as a parameter to subscribe method? ---- 2020-01-17 19:34:01 UTC - Abraham: the docs for Python still need to be updated ---- 2020-01-17 19:34:47 UTC - Matteo Merli: the Python client needs to be fixed ---- 2020-01-17 19:35:44 UTC - Naby: Okay ---- 2020-01-17 19:36:46 UTC - Naby: By the way, how about python functions? Is there a way to configure it to auto discover new topics? Assuming python client got fixed. ---- 2020-01-17 20:08:46 UTC - Abraham: Hello, I’m trying to create a python Pulsar Function, but I’m getting an error. This is how I’m attempting to update the function: `pulsar-admin functions update --py /scripts/publish_function.py --className PublishFunction --inputs <persistent://public/default/sentences> --output <persistent://public/default/wordcount>` but the `functions` log shows me this error: ```[2020-01-17 19:59:13 +0000] [INFO] python_instance_main.py: Starting Python instance with Namespace(auto_ack='true', client_auth_params=None, client_auth_plugin=None, function_classname='PublishFunction', function_id='8a6bb9fe-6b5d-40bb-b2e9-a9eca63628a1', function_version='4358d38b-57a6-43fb-ba06-8946b3df0d44', hostname_verification_enabled='false', instance_id='0', log_topic=None, logging_directory='/pulsar/logs/functions', logging_file='PublishFunction', max_buffered_tuples='1024', name='PublishFunction', namespace='default', port=36707, processing_guarantees='ATLEAST_ONCE', pulsar_serviceurl='<pulsar://127.0.0.1:6650>', py='/tmp/pulsar_functions/public/default/PublishFunction/0/PublishFunction.py', sink_serde_classname=None, sink_topic='wordcount', source_subscription_type='SHARED', source_timeout_ms=None, source_topics_serde_classname='{"sentences":""}', tenant='public', tls_allow_insecure_connection='false', tls_trust_cert_path=None, topics_pattern=None, use_tls='false', user_config=None) [2020-01-17 19:59:13 +0000] [INFO] util.py: Trying to import serde.IdentitySerDe from path /tmp/pulsar_functions/public/default/PublishFunction/0 [2020-01-17 19:59:13 +0000] [INFO] util.py: Add a new dependency to the path: /tmp/pulsar_functions/public/default/PublishFunction/0 [2020-01-17 19:59:13 +0000] [INFO] util.py: Import failed class_name IdentitySerDe from path /tmp/pulsar_functions/public/default/PublishFunction/0 [2020-01-17 19:59:13 +0000] [INFO] util.py: No module named serde Traceback (most recent call last): File "/pulsar/instances/python-instance/util.py", line 61, in import_class_from_path mod = __import__(classname_path, fromlist=[class_name], level=-1) ImportError: No module named serde [2020-01-17 19:59:13 +0000] [INFO] util.py: Trying to import serde.IdentitySerDe from path /pulsar/instances/python-instance/pulsar/functions [2020-01-17 19:59:13 +0000] [INFO] util.py: Add a new dependency to the path: /pulsar/instances/python-instance/pulsar/functions [2020-01-17 19:59:13 +0000] [INFO] python_instance.py: Setting up consumer for topic sentences with subname public/default/PublishFunction ``` If I don’t specify a schema, is the expected behavior that the IdentitySchema would be used? Does my Pulsar Function code need to include anything? I did try `from pulsar import Function, SerDe` in my function code. ---- 2020-01-17 20:10:06 UTC - Naby: More on: <https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1579275782012800> ---- 2020-01-17 21:23:26 UTC - Mathieu Druart: @Addison Higham @Sijie Guo Hi ! We tried to use state API in Pulsar functions with the 2.5.0 version, but still no luck, only "State is not enabled." errors ... (deploying on Kubernetes with default Helm with `extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent` in Bookkeeper conf file). Any Idea ? Thanks ! ---- 2020-01-17 23:48:52 UTC - tuteng: Please try check source code `serde` or `Serde` ---- 2020-01-17 23:59:53 UTC - Abraham: I deleted the Function and now I’m unable to create a new one ---- 2020-01-18 00:00:21 UTC - Abraham: This is the cli command I’m using: ```pulsar-admin functions create --py /scripts/publish_function.py --className WordCountFunction --inputs <persistent://public/default/sentences> --output <persistent://public/default/wordcount>``` ---- 2020-01-18 00:01:11 UTC - Abraham: and the pulsar logs give me this: ---- 2020-01-18 00:01:36 UTC - Abraham: ```23:55:23.922 [pulsar-web-55-25] INFO org.apache.pulsar.functions.worker.Utils - Uploading function package to 'public/default/WordCountFunction/032eaa49-4040-49b9-9e96-7c37e3aeb182-publish_function.py' 23:55:23.975 [pulsar-web-55-25] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Error uploading file packagePath: "public/default/WordCountFunction/032eaa49-4040-49b9-9e96-7c37e3aeb182-publish_function.py" org.apache.distributedlog.exceptions.UnexpectedException: unexpected exception in AppendOnlyStreamWriter.force at org.apache.distributedlog.AppendOnlyStreamWriter.force(AppendOnlyStreamWriter.java:62) ~[org.apache.distributedlog-distributedlog-core-4.7.1.jar:4.7.1] at org.apache.pulsar.functions.worker.dlog.DLOutputStream.flush(DLOutputStream.java:64) ~[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating] at org.apache.pulsar.functions.worker.Utils.uploadToBookeeper(Utils.java:129) ~[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating] at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.updateRequest(FunctionsImpl.java:404) [org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating] at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:166) [org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating] at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:85) [org.apache.pulsar-pulsar-broker-2.1.0-incubating.jar:2.1.0-incubating] ...``` ---- 2020-01-18 00:24:33 UTC - Kirill Merkushev: is that fine to keep in the state in context several millions of keys and use that to filter in next functions? ---- 2020-01-18 01:30:50 UTC - Sijie Guo: @Mathieu Druart there is no much progress regarding state in Pulsar Functions in 2.5.0. We might be putting back the focus on this area for next major release (2.6.x releases). ---- 2020-01-18 01:32:10 UTC - Mathieu Druart: @Sijie Guo ok, thanks for the answer ---- 2020-01-18 01:34:29 UTC - Sijie Guo: @Addison Higham: do you have any ideas that we can improve the k8s runtime documentation here <http://pulsar.apache.org/docs/en/functions-runtime/#configure-kubernetes-runtime> ? Can you suggest a few? @Anonymitaet @Jennifer Huang can incorporate your comments into improving the documentation. ---- 2020-01-18 05:40:26 UTC - Sandeep Kotagiri: @Addison Higham @Sijie Guo I am extending this thread by some more discussion. And with some failures I observe in my environment when running functions with Kubernetes run time. I have configured kubernetes runtime in functions_worker.yml file. And I am able to launch a statefulset/pod to run the function. However, the function fails to run in my environment. In my case I am using TLS for Pulsar, and I am also using TLS Authentication. I have figured out how this is failing. Pod is starting with the following configuration as startup script. `/pulsar/bin/pulsar-admin --admin-url <https://172.16.77.84:8443> functions download --tenant public --namespace default --name firstfunction --destination-file /pulsar/api-examples.jar && SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID} && exec java -cp /pulsar/instances/java-instance.jar:/pulsar/instances/deps/* -Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps -Dpulsar.functions.instance.classpath=/pulsar/conf:::/pulsar/lib/*: -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml -Dpulsar.function.log.dir=logs/functions/public/default/firstfunction -Dpulsar.function.log.file=firstfunction-$SHARD_ID -Xmx1073741824 org.apache.pulsar.functions.instance.JavaInstanceMain --jar /pulsar/api-examples.jar --instance_id $SHARD_ID --function_id 1f74c09f-e96d-4348-b35d-62bbd0d96fce --function_version 74abbe36-cd42-4e22-a404-ed27ee9602a6 --function_details '{"tenant":"public","namespace":"default","name":"firstfunction","className":"org.apache.pulsar.functions.api.examples.ExclamationFunction","autoAck":true,"parallelism":1,"source":{"typeClassName":"java.lang.String","inputSpecs":{"topicA":{}},"cleanupSubscription":true},"sink":{"topic":"<persistent://public/default/topicAOut>","typeClassName":"java.lang.String"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}' --pulsar_serviceurl <pulsar+ssl://172.16.77.84:6651> --use_tls true --tls_allow_insecure false --hostname_verification_enabled false --tls_trust_cert_path /pulsar/ssl/some_ca.crt --max_buffered_tuples 1024 --port 9093 --metrics_port 9094 --expected_healthcheck_interval -1 --secrets_provider org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider --cluster_name pulsar-itomdipulsar.` However, this is missing `--client_auth_plugin and --client_auth_parameters parameters`. When I intervene manually and set these parameters, the function seems to be running well. ---- 2020-01-18 05:44:12 UTC - Sandeep Kotagiri: I am adding my functions_worker.yml settings here. ---- 2020-01-18 05:44:49 UTC - Sandeep Kotagiri: ```assignmentWriteMaxRetries: 60 authenticationEnabled: false authenticationProviders: null authorizationEnabled: false authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider clientAuthenticationParameters: tlsCertFile:/pulsar/server.crt,tlsKeyFile:/pulsar/server.key clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls clusterCoordinationTopicName: coordinate configurationStoreServers: localhost:2181 connectorsDirectory: ./connectors downloadDirectory: /tmp/pulsar_functions failureCheckFreqMs: 30000 functionAssignmentTopicName: assignments functionMetadataTopicName: metadata initialBrokerReconnectMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 kubernetesContainerFactory: customLabels: null extraFunctionDependenciesDir: null imagePullPolicy: Always jobNamespace: sandeep k8Uri: null percentMemoryPadding: 10 pulsarAdminUrl: null pulsarDockerImageName: pulsar-image:latest pulsarRootDir: null pulsarServiceUrl: null submittingInsidePod: true numFunctionPackageReplicas: 1 numHttpServerThreads: 8 pulsarFunctionsCluster: pulsar-itomdipulsar pulsarFunctionsNamespace: public/functions pulsarServiceUrl: <pulsar+ssl://localhost:6651> pulsarWebServiceUrl: <https://localhost:8443> rescheduleTimeoutMs: 60000 schedulerClassName: org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler secretsProviderConfiguratorClassName: org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator superUserRoles: null tlsAllowInsecureConnection: false tlsCertRefreshCheckDurationSec: 300 tlsCertificateFilePath: /var/run/secrets/boostport.com/server.crt tlsEnabled: true tlsKeyFilePath: /var/run/secrets/boostport.com/server.key tlsTrustCertsFilePath: /var/run/secrets/boostport.com/trustedCAs/RIC_ca.crt topicCompactionFrequencySec: 1800 useTls: 'true' workerHostname: localhost workerId: standalone workerPort: 6750 workerPortTls: 6751 zooKeeperOperationTimeoutSeconds: 30 zooKeeperSessionTimeoutMillis: 30000``` ---- 2020-01-18 05:53:45 UTC - Sandeep Kotagiri: This is Pulsar 2.4.2. ---- 2020-01-18 05:54:51 UTC - Sandeep Kotagiri: I see that the org.apache.pulsar.functions.runtime.RuntimeUtils class is missing code that sets the client_auth_plugin and client_auth_parameters parmeters. ---- 2020-01-18 05:56:53 UTC - Sandeep Kotagiri: So is this a bug? Or am I supposed to utilize the secrets functionality via secretsProviderConfiguratorClassName in an appropriate manner. Atleast looking at the JavaInstanceStarter class seems to be telling me otherwise where RuntimeUtils class is missing these parameters. ----
