2020-01-17 09:56:52 UTC - transnano: @transnano has joined the channel
----
2020-01-17 15:37:55 UTC - Naby: I was looking for a way to configure 
`pattern_auto_discovery_period` or something similar for my Python function 
that uses `topics_pattern`. Then I saw that in python client 
(<https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py>)
 the value of `pattern_auto_discovery_period` in `subscribe` method is not 
used. Is that correct?
----
2020-01-17 15:43:02 UTC - Naby: Also, any suggestion on how to configure a 
python function to discover new topics that match its topics pattern?
@Matteo Merli, this was my original problem which led to the previous one on 
pulsar python client. Since a function is running by the broker itself, is 
there a different mechanism to notify the function about the new topics that 
match the pattern? Or I should configure the function to automatically discover 
the new topics? If so, how?
In my case, when a new topic is generated, it takes some time for its messages 
to be processed by my python function. And any messages in between get lost.
----
2020-01-17 16:20:01 UTC - Fernando: When creating a python producer with a 
schema definition I’m getting
```Exception: Pulsar error: TimeOut```
when trying to create the producer. Any clues?
----
2020-01-17 16:22:21 UTC - Sijie Guo: Does the topic exist before?
----
2020-01-17 16:22:45 UTC - Fernando: no
----
2020-01-17 16:23:28 UTC - Sijie Guo: ok. then you might have to look into the 
producer log or broker log to see if there are any errors.
----
2020-01-17 16:23:41 UTC - Fernando: will do
----
2020-01-17 16:32:45 UTC - Fernando: there are no errors, just a couple of 
warnings
----
2020-01-17 16:34:05 UTC - Sijie Guo: are you able to produce with other tools 
or clients?
----
2020-01-17 16:34:35 UTC - Fernando: If I set the schema to ByteSchema it works 
just fine
----
2020-01-17 16:34:53 UTC - Fernando: in fact a couple of days ago I was testing 
with the same AvroSchema and it worked
----
2020-01-17 16:35:03 UTC - Fernando: 
----
2020-01-17 16:35:30 UTC - Fernando: the topic is 
`topic=<persistent://packhunt/billing/billing-records>`
----
2020-01-17 16:35:30 UTC - Sijie Guo: okay. I asked if you are producing a 
message to an existing topic.
----
2020-01-17 16:36:07 UTC - Sijie Guo: are you trying to publish records with 
schema to a bytes topic?
----
2020-01-17 16:36:25 UTC - Fernando: no, I tested on different topics before. 
This is a new topic
----
2020-01-17 16:36:58 UTC - Fernando: `pulsar-admin topics list packhunt/billing` 
doesn’t return anything
----
2020-01-17 16:37:34 UTC - Fernando: ```pulsar-admin schemas get 
<persistent://packhunt/billing/billing-records>
HTTP 404 Not Found

Reason: HTTP 404 Not Found
command terminated with exit code 1```
----
2020-01-17 16:37:45 UTC - Sijie Guo: ok
----
2020-01-17 16:37:51 UTC - Sijie Guo: what version are you running?
----
2020-01-17 16:38:05 UTC - Fernando: master branch
----
2020-01-17 16:38:48 UTC - Sijie Guo: okay why not try a release?
----
2020-01-17 16:39:02 UTC - Fernando: I need a feature that is in 2.5.0
----
2020-01-17 16:39:06 UTC - Sijie Guo: master branch is under development and not 
released yet.
----
2020-01-17 16:39:14 UTC - Sijie Guo: 2.5.0 is released.
----
2020-01-17 16:39:35 UTC - Sijie Guo: I mean not completed release yet
----
2020-01-17 16:39:47 UTC - Sijie Guo: I am still completing the last phase for 
2.5.0 release.
----
2020-01-17 16:40:14 UTC - Fernando: ok
----
2020-01-17 16:40:19 UTC - Sijie Guo: in that case, you can build from tag v2.5.0
----
2020-01-17 16:40:25 UTC - Sijie Guo: rather than from master
----
2020-01-17 16:40:34 UTC - Fernando: I will thank you
----
2020-01-17 16:41:14 UTC - Sijie Guo: regardlless, I see “WARN  
org.apache.pulsar.broker.service.AbstractTopic - 
[<persistent://packhunt/billing/billing-records>] Error getting policies 
java.util.concurrent.CompletableFuture cannot be cast to 
org.apache.pulsar.common.policies.data.Policies and publish throttling will be 
disabled” in your error log
----
2020-01-17 16:41:29 UTC - Sijie Guo: that is causing the timeout
----
2020-01-17 16:41:43 UTC - Sijie Guo: let me check if it is regression in master 
or not
----
2020-01-17 16:42:00 UTC - Fernando: thank you!
----
2020-01-17 16:44:45 UTC - Sijie Guo: what is the gitsha of your cloned master?
----
2020-01-17 16:45:15 UTC - Fernando: let me check
----
2020-01-17 16:46:35 UTC - Fernando: oh, I think I forgot to fetch from upstream 
in my fork so it’s around a month old…
----
2020-01-17 16:46:37 UTC - Fernando: `8a20d9a221997b5ade0c3a006b3c01be57ec0d4b`
----
2020-01-17 16:46:52 UTC - Fernando: Date:   Mon Dec 9 17:57:01 2019 +0800
----
2020-01-17 16:49:48 UTC - Fernando: but so is this happening because the topic 
doesn’t exist? If i create it as a `ByteSchema` and then create another 
producer with `AvroSchema` would it work? It’s weird that a couple of days ago 
I was able to test this no problem but when I create this new namespace it 
suddenly fails
----
2020-01-17 16:52:05 UTC - Sijie Guo: I am not sure. the error message looks 
like a bug to me.
----
2020-01-17 16:52:57 UTC - Fernando: ok I’m going to build 2.5.0 tag to see what 
happens
----
2020-01-17 16:55:28 UTC - Sijie Guo: actually I don’t think that is a problem 
although the error message is confusing.
----
2020-01-17 16:55:55 UTC - Fernando: do you have any suggestion to debug it 
further?
----
2020-01-17 16:55:57 UTC - Sijie Guo: Can you try v2.5.0 tag and let me know 
what did you see?
----
2020-01-17 16:56:18 UTC - Sijie Guo: I would suggest using a release tag
----
2020-01-17 16:56:33 UTC - Fernando: is this the command to build the docker 
images? `mvn clean install -Pdocker -DskipTests`
----
2020-01-17 16:56:38 UTC - Sijie Guo: It is hard to debug things on a gitsha 
that was fetched almost 2 months ago.
----
2020-01-17 16:56:51 UTC - Sijie Guo: yes.
----
2020-01-17 16:57:08 UTC - Fernando: thank you! I’ll try the release and let you 
know
----
2020-01-17 17:02:55 UTC - Sijie Guo: thanks
----
2020-01-17 17:11:24 UTC - shao: @shao has joined the channel
----
2020-01-17 18:50:09 UTC - Fernando: ok it works with 2.5.0 tag
----
2020-01-17 19:01:42 UTC - Sagar Shah: Hi, anyone knows if PublishAsync 
guarantees msg ordering?
----
2020-01-17 19:02:29 UTC - Matteo Merli: Yes, it does
----
2020-01-17 19:03:54 UTC - Sagar Shah: Thanks!
----
2020-01-17 19:13:39 UTC - Sagar Shah: Just to understand: If we send msgs A, B, 
C - and if B fails and the callback function throw exception, will C msg reach 
topic?
----
2020-01-17 19:24:45 UTC - Naby: @Matteo Merli, do I understand this correctly? 
Or am I missing something here?
----
2020-01-17 19:26:08 UTC - Matteo Merli: that's correct. it should be passed to 
the C++ config object in the subscribe method
----
2020-01-17 19:26:31 UTC - Matteo Merli: C will also receive an exception
----
2020-01-17 19:26:36 UTC - Naby: I changed the value of 
`pattern_auto_discovery_period` to 1 sec. and still not getting messages from 
new topics for a while.
----
2020-01-17 19:27:10 UTC - Naby: How can I pass the new value to C++ config 
object?
----
2020-01-17 19:27:35 UTC - Matteo Merli: like: 
<https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py#L635>
----
2020-01-17 19:28:34 UTC - Matteo Merli: In any case the auto discovery period 
is capped at 1min to avoid clients keep doing requests to broker in short 
fashion
----
2020-01-17 19:32:08 UTC - Naby: I understand. But, in some cases, it might be 
desirable. How can I change the value, since I can’t pass it as a parameter to 
subscribe method?
----
2020-01-17 19:34:01 UTC - Abraham: the docs for Python still need to be updated
----
2020-01-17 19:34:47 UTC - Matteo Merli: the Python client needs to be fixed
----
2020-01-17 19:35:44 UTC - Naby: Okay
----
2020-01-17 19:36:46 UTC - Naby: By the way, how about python functions? Is 
there a way to configure it to auto discover new topics? Assuming python client 
got fixed.
----
2020-01-17 20:08:46 UTC - Abraham: Hello, I’m trying to create a python Pulsar 
Function, but I’m getting an error. This is how I’m attempting to update the 
function:
`pulsar-admin functions update --py /scripts/publish_function.py --className 
PublishFunction --inputs <persistent://public/default/sentences> --output 
<persistent://public/default/wordcount>`

but the `functions` log shows me this error:
```[2020-01-17 19:59:13 +0000] [INFO] python_instance_main.py: Starting Python 
instance with Namespace(auto_ack='true', client_auth_params=None, 
client_auth_plugin=None, function_classname='PublishFunction', 
function_id='8a6bb9fe-6b5d-40bb-b2e9-a9eca63628a1', 
function_version='4358d38b-57a6-43fb-ba06-8946b3df0d44', 
hostname_verification_enabled='false', instance_id='0', log_topic=None, 
logging_directory='/pulsar/logs/functions', logging_file='PublishFunction', 
max_buffered_tuples='1024', name='PublishFunction', namespace='default', 
port=36707, processing_guarantees='ATLEAST_ONCE', 
pulsar_serviceurl='<pulsar://127.0.0.1:6650>', 
py='/tmp/pulsar_functions/public/default/PublishFunction/0/PublishFunction.py', 
sink_serde_classname=None, sink_topic='wordcount', 
source_subscription_type='SHARED', source_timeout_ms=None, 
source_topics_serde_classname='{"sentences":""}', tenant='public', 
tls_allow_insecure_connection='false', tls_trust_cert_path=None, 
topics_pattern=None, use_tls='false', user_config=None)
[2020-01-17 19:59:13 +0000] [INFO] util.py: Trying to import 
serde.IdentitySerDe from path 
/tmp/pulsar_functions/public/default/PublishFunction/0
[2020-01-17 19:59:13 +0000] [INFO] util.py: Add a new dependency to the path: 
/tmp/pulsar_functions/public/default/PublishFunction/0
[2020-01-17 19:59:13 +0000] [INFO] util.py: Import failed class_name 
IdentitySerDe from path /tmp/pulsar_functions/public/default/PublishFunction/0
[2020-01-17 19:59:13 +0000] [INFO] util.py: No module named serde
Traceback (most recent call last):
  File "/pulsar/instances/python-instance/util.py", line 61, in 
import_class_from_path
    mod = __import__(classname_path, fromlist=[class_name], level=-1)
ImportError: No module named serde
[2020-01-17 19:59:13 +0000] [INFO] util.py: Trying to import 
serde.IdentitySerDe from path /pulsar/instances/python-instance/pulsar/functions
[2020-01-17 19:59:13 +0000] [INFO] util.py: Add a new dependency to the path: 
/pulsar/instances/python-instance/pulsar/functions
[2020-01-17 19:59:13 +0000] [INFO] python_instance.py: Setting up consumer for 
topic sentences with subname public/default/PublishFunction ```
If I don’t specify a schema, is the expected behavior that the IdentitySchema 
would be used? Does my Pulsar Function code need to include anything? I did try 
`from pulsar import Function, SerDe` in my function code.
----
2020-01-17 20:10:06 UTC - Naby: More on:
<https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1579275782012800>
----
2020-01-17 21:23:26 UTC - Mathieu Druart: @Addison Higham @Sijie Guo Hi ! We 
tried to use state API in Pulsar functions with the 2.5.0 version, but still no 
luck, only "State is not enabled." errors ...  (deploying on Kubernetes with 
default Helm with 
`extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent`
 in Bookkeeper conf file). Any Idea ? Thanks !
----
2020-01-17 23:48:52 UTC - tuteng: Please try check source code `serde` or 
`Serde`
----
2020-01-17 23:59:53 UTC - Abraham: I deleted the Function and now I’m unable to 
create a new one
----
2020-01-18 00:00:21 UTC - Abraham: This is the cli command I’m using:
```pulsar-admin functions create --py /scripts/publish_function.py --className 
WordCountFunction --inputs <persistent://public/default/sentences> --output 
<persistent://public/default/wordcount>```

----
2020-01-18 00:01:11 UTC - Abraham: and the pulsar logs give me this:
----
2020-01-18 00:01:36 UTC - Abraham: ```23:55:23.922 [pulsar-web-55-25] INFO  
org.apache.pulsar.functions.worker.Utils - Uploading function package to 
'public/default/WordCountFunction/032eaa49-4040-49b9-9e96-7c37e3aeb182-publish_function.py'
23:55:23.975 [pulsar-web-55-25] ERROR 
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Error uploading 
file packagePath: 
"public/default/WordCountFunction/032eaa49-4040-49b9-9e96-7c37e3aeb182-publish_function.py"

org.apache.distributedlog.exceptions.UnexpectedException: unexpected exception 
in AppendOnlyStreamWriter.force
        at 
org.apache.distributedlog.AppendOnlyStreamWriter.force(AppendOnlyStreamWriter.java:62)
 ~[org.apache.distributedlog-distributedlog-core-4.7.1.jar:4.7.1]
        at 
org.apache.pulsar.functions.worker.dlog.DLOutputStream.flush(DLOutputStream.java:64)
 
~[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating]
        at 
org.apache.pulsar.functions.worker.Utils.uploadToBookeeper(Utils.java:129) 
~[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating]
        at 
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.updateRequest(FunctionsImpl.java:404)
 
[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating]
        at 
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:166)
 
[org.apache.pulsar-pulsar-functions-worker-2.1.0-incubating.jar:2.1.0-incubating]
        at 
org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:85)
 [org.apache.pulsar-pulsar-broker-2.1.0-incubating.jar:2.1.0-incubating]
 ...```
----
2020-01-18 00:24:33 UTC - Kirill Merkushev: is that fine to keep in the state 
in context several millions of keys and use that to filter in next functions?
----
2020-01-18 01:30:50 UTC - Sijie Guo: @Mathieu Druart there is no much progress 
regarding state in Pulsar Functions in 2.5.0. We might be putting back the 
focus on this area for next major release (2.6.x releases).
----
2020-01-18 01:32:10 UTC - Mathieu Druart: @Sijie Guo ok, thanks for the answer
----
2020-01-18 01:34:29 UTC - Sijie Guo: @Addison Higham: do you have any ideas 
that we can improve the k8s runtime documentation here 
<http://pulsar.apache.org/docs/en/functions-runtime/#configure-kubernetes-runtime>
 ? Can you suggest a few?  @Anonymitaet @Jennifer Huang can incorporate your 
comments into improving the documentation.
----
2020-01-18 05:40:26 UTC - Sandeep Kotagiri: @Addison Higham @Sijie Guo I am 
extending this thread by some more discussion. And with some failures I observe 
in my environment when running functions with Kubernetes run time. I have 
configured kubernetes runtime in functions_worker.yml file. And I am able to 
launch a statefulset/pod to run the function. However, the function fails to 
run in my environment. In my case I am using TLS for Pulsar, and I am also 
using TLS Authentication. I have figured out how this is failing. Pod is 
starting with the following configuration as startup script. 
`/pulsar/bin/pulsar-admin --admin-url <https://172.16.77.84:8443> functions 
download --tenant public --namespace default --name firstfunction 
--destination-file /pulsar/api-examples.jar &amp;&amp; SHARD_ID=${POD_NAME##*-} 
&amp;&amp; echo shardId=${SHARD_ID} &amp;&amp; exec java -cp 
/pulsar/instances/java-instance.jar:/pulsar/instances/deps/* 
-Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps 
-Dpulsar.functions.instance.classpath=/pulsar/conf:::/pulsar/lib/*: 
-Dlog4j.configurationFile=kubernetes_instance_log4j2.xml 
-Dpulsar.function.log.dir=logs/functions/public/default/firstfunction 
-Dpulsar.function.log.file=firstfunction-$SHARD_ID -Xmx1073741824 
org.apache.pulsar.functions.instance.JavaInstanceMain --jar 
/pulsar/api-examples.jar --instance_id $SHARD_ID --function_id 
1f74c09f-e96d-4348-b35d-62bbd0d96fce --function_version 
74abbe36-cd42-4e22-a404-ed27ee9602a6 --function_details 
'{"tenant":"public","namespace":"default","name":"firstfunction","className":"org.apache.pulsar.functions.api.examples.ExclamationFunction","autoAck":true,"parallelism":1,"source":{"typeClassName":"java.lang.String","inputSpecs":{"topicA":{}},"cleanupSubscription":true},"sink":{"topic":"<persistent://public/default/topicAOut>","typeClassName":"java.lang.String"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}'
 --pulsar_serviceurl <pulsar+ssl://172.16.77.84:6651> --use_tls true 
--tls_allow_insecure false --hostname_verification_enabled false 
--tls_trust_cert_path /pulsar/ssl/some_ca.crt --max_buffered_tuples 1024 --port 
9093 --metrics_port 9094 --expected_healthcheck_interval -1 --secrets_provider 
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider 
--cluster_name pulsar-itomdipulsar.` However, this is missing  
`--client_auth_plugin and --client_auth_parameters parameters`. When I 
intervene manually and set these parameters, the function seems to be running 
well.
----
2020-01-18 05:44:12 UTC - Sandeep Kotagiri: I am adding my functions_worker.yml 
settings here.
----
2020-01-18 05:44:49 UTC - Sandeep Kotagiri: ```assignmentWriteMaxRetries: 60
authenticationEnabled: false
authenticationProviders: null
authorizationEnabled: false
authorizationProvider: 
org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
clientAuthenticationParameters: 
tlsCertFile:/pulsar/server.crt,tlsKeyFile:/pulsar/server.key
clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls
clusterCoordinationTopicName: coordinate
configurationStoreServers: localhost:2181
connectorsDirectory: ./connectors
downloadDirectory: /tmp/pulsar_functions
failureCheckFreqMs: 30000
functionAssignmentTopicName: assignments
functionMetadataTopicName: metadata
initialBrokerReconnectMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
kubernetesContainerFactory:
  customLabels: null
  extraFunctionDependenciesDir: null
  imagePullPolicy: Always
  jobNamespace: sandeep
  k8Uri: null
  percentMemoryPadding: 10
  pulsarAdminUrl: null
  pulsarDockerImageName: pulsar-image:latest
  pulsarRootDir: null
  pulsarServiceUrl: null
  submittingInsidePod: true
numFunctionPackageReplicas: 1
numHttpServerThreads: 8
pulsarFunctionsCluster: pulsar-itomdipulsar
pulsarFunctionsNamespace: public/functions
pulsarServiceUrl: <pulsar+ssl://localhost:6651>
pulsarWebServiceUrl: <https://localhost:8443>
rescheduleTimeoutMs: 60000
schedulerClassName: 
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler
secretsProviderConfiguratorClassName: 
org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
superUserRoles: null
tlsAllowInsecureConnection: false
tlsCertRefreshCheckDurationSec: 300
tlsCertificateFilePath: /var/run/secrets/boostport.com/server.crt
tlsEnabled: true
tlsKeyFilePath: /var/run/secrets/boostport.com/server.key
tlsTrustCertsFilePath: /var/run/secrets/boostport.com/trustedCAs/RIC_ca.crt
topicCompactionFrequencySec: 1800
useTls: 'true'
workerHostname: localhost
workerId: standalone
workerPort: 6750
workerPortTls: 6751
zooKeeperOperationTimeoutSeconds: 30
zooKeeperSessionTimeoutMillis: 30000```

----
2020-01-18 05:53:45 UTC - Sandeep Kotagiri: This is Pulsar 2.4.2.
----
2020-01-18 05:54:51 UTC - Sandeep Kotagiri: I see that the  
org.apache.pulsar.functions.runtime.RuntimeUtils class is missing code that 
sets the client_auth_plugin and client_auth_parameters parmeters.
----
2020-01-18 05:56:53 UTC - Sandeep Kotagiri: So is this a bug? Or am I supposed 
to utilize the secrets functionality via secretsProviderConfiguratorClassName 
in an appropriate manner. Atleast looking at the JavaInstanceStarter class 
seems to be telling me otherwise where RuntimeUtils class is missing these 
parameters.
----

Reply via email to