2019-11-01 09:38:12 UTC - xiaolong.ran: Hello, <https://pulsar.apache.org/docs/en/functions-overview/> ---- 2019-11-01 09:39:30 UTC - xiaolong.ran: This is some examples about pulsar-functions in java.
<https://github.com/apache/pulsar/tree/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples> ---- 2019-11-01 10:07:39 UTC - Gopi Krishna L: Yeah already found those, thanks anyway. Right now I am writing a Producer and Consumer java codes, but I am not sure how to run them. Should we run it in the same way as in to deploy a jar and run it using "pulsar client functions" or is there any other way to run them ? ---- 2019-11-01 13:15:17 UTC - Alexandre DUVAL: I can't find the option to not delete subscription when a function is deleted (upgrade can't change input topic, so have to delete and recreate, but im losing messages :/), I read something about cleanupSubscription=true|false, but it's not a function create parameters. ---- 2019-11-01 13:19:01 UTC - Jasper Li: Thanks again!!! But I have got another issues: ``` Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException ... org.apache.bookkeeper.mledger.impl.NullLedgerOffloader.offload ``` which is similar with <https://github.com/apache/pulsar/issues/2697> and I have changed the yaml file of deployment in this way: sed -i 's/managedLedgerOffloadDrive=/managedLedgerOffloadDrive=google-cloud-storage/' /pulsar/conf/broker.conf && sed -i 's/gcsManagedLedgerOffloadRegion=/gcsManagedLedgerOffloadRegion=us-central1/' /pulsar/conf/broker.conf && sed -i 's/gcsManagedLedgerOffloadBucket=/gcsManagedLedgerOffloadBucket=my-bucket/' /pulsar/conf/broker.conf && sed -i 's|gcsManagedLedgerOffloadServiceAccountKeyFile=|gcsManagedLedgerOffloadServiceAccountKeyFile=/var/secrets/google/key.json|' /pulsar/conf/broker.conf && bin/apply-config-from-env.py conf/broker.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && bin/gen-yml-from-env.py conf/functions_worker.yml && bin/pulsar broker and I have checked they are all in the conf/broker.conf after brokers are started. Is my configuration wrong? :persevere: ---- 2019-11-01 13:59:03 UTC - SWC: @SWC has joined the channel ---- 2019-11-01 14:02:34 UTC - SWC: Trying to evaluate whether Pulsar is a good fit for us, and have a couple questions. Can anyone clarify for me? When doing pub sub, the subscription types I see (Exclusive, Shared, Key Shared, Failover) don't seem to support like a fan out delivery (one message sent to multiple listeners) am I misunderstanding something? ---- 2019-11-01 14:12:25 UTC - Vladimir Shchur: For fan out you need to use several subscriptions (several consumers with different subscription names) ---- 2019-11-01 14:14:31 UTC - SWC: So I'm thinking of it at the wrong level? I see. Instead of thinking a subscription fans out to multiple clients, I need to think of it like a given topic fans out to multiple subscriptions. ---- 2019-11-01 14:20:37 UTC - SWC: Second question: it looks like messages can be configured to be retained after ack, with a TTL. Presumably if there is a topic that I want to retain forever or at least long term, that could be configured. Is there an example on how to attach to that topic and replay the messages to a new consumer somewhere? ---- 2019-11-01 14:30:00 UTC - Vladimir Shchur: Correct :slightly_smiling_face: ---- 2019-11-01 14:30:07 UTC - Karthik Ramasamy: Producer and Consumers are run as normal programs ---- 2019-11-01 14:30:44 UTC - SWC: Oh, I think I see now. I would just connect to topic with new subscription and set initial offset to earliest. Should have seen that one initially, sorry! ---- 2019-11-01 14:30:54 UTC - Karthik Ramasamy: whereas Pulsar functions are orchestrated and run by Pulsar ---- 2019-11-01 14:33:03 UTC - Vladimir Shchur: Right :slightly_smiling_face: One more option is to use Reader api, it uses special nonDurable subscription inside ---- 2019-11-01 14:40:34 UTC - SWC: Oh wait...I think I was mixing reader and subscription together. So is what I said right? Can a subscription actually set the initial position, or that is just a reader? ---- 2019-11-01 14:44:40 UTC - Vladimir Shchur: They can both. ---- 2019-11-01 15:33:50 UTC - Raman Gupta: Apparently there is a way to run Pulsar functions outside of a Pulsar broker as well, though its not documented. ---- 2019-11-01 16:31:20 UTC - Matteo Merli: The `localrun` mode for function is documented here: <https://pulsar.apache.org/docs/en/functions-cli/#localrun> You can also embed the local runner into a Java app: <https://pulsar.apache.org/docs/en/functions-debug/#debug-with-localrun-mode> ---- 2019-11-01 16:33:52 UTC - Matteo Merli: With a `Consumer` you can only set the initial position where to create a subscription if it doesn’t exist. If the subscription exists, the delivery will restart from the first unacked message. With a `Reader` you always specify the message id you want to start reading from. ---- 2019-11-01 16:43:48 UTC - Kabeer Ahmed: Folks - does anyone know how to deal with this when ingesting data from Spark. Producer send queue is full. There are a few options to set blocking etc but it would be great to see an example if someone has one handy. ``` Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:507) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:301) at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:226) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:66) at org.apache.spark.sql.pulsar.PulsarRowWriter.sendRow(PulsarWriteTask.scala:200) at org.apache.spark.sql.pulsar.PulsarWriteTask.execute(PulsarWriteTask.scala:42) at org.apache.spark.sql.pulsar.PulsarSinks$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(PulsarSinks.scala:160) at org.apache.spark.sql.pulsar.PulsarSinks$$anonfun$write$1$$anonfun$apply$1.apply(PulsarSinks.scala:160) at org.apache.spark.sql.pulsar.PulsarSinks$$anonfun$write$1$$anonfun$apply$1.apply(PulsarSinks.scala:160) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.sql.pulsar.PulsarSinks$$anonfun$write$1.apply(PulsarSinks.scala:160) at org.apache.spark.sql.pulsar.PulsarSinks$$anonfun$write$1.apply(PulsarSinks.scala:157) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ---- 2019-11-01 16:45:00 UTC - Matteo Merli: It should set `ProducerBuilder.blockIfQueueFull(true)` when creating the producer ---- 2019-11-01 16:45:33 UTC - Kabeer Ahmed: @Matteo Merli - any example of how the option can be set on a dataframe when spark is the producer? ---- 2019-11-01 16:46:31 UTC - Matteo Merli: which connector are you using? ---- 2019-11-01 16:47:16 UTC - Matteo Merli: I mean: is `PulsarWriteTask.scala` part of your code? ---- 2019-11-01 16:48:44 UTC - Kabeer Ahmed: The connector I am trying to use is: <https://github.com/streamnative/pulsar-spark> ---- 2019-11-01 16:49:06 UTC - Kabeer Ahmed: The command that I have picked up is from their example in the README.md - which works fine for reading from pulsar but writing is failing. ---- 2019-11-01 16:49:30 UTC - Kabeer Ahmed: ``` // Write key-value data from a DataFrame to a specific Pulsar topic specified in an option val ds = df .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .writeStream .format("pulsar") .option("service.url", "<pulsar://localhost:6650>") .option("admin.url", "<http://localhost:8080>") .option("topic", "topic1") .start() ``` ---- 2019-11-01 16:50:36 UTC - Kabeer Ahmed: My guess is that it must be possible to pass the option as .option("producerBlockIfQueueFull", "true") ---- 2019-11-01 16:50:55 UTC - Matteo Merli: Then you should check with them. It looks like a a bug in the connector, in that when using sendAsync() the default behavior is always non-blocking and application is responsible for handling that ---- 2019-11-01 16:50:58 UTC - Kabeer Ahmed: May be I should have looked into the code but I thought there are so many folks here that I could ask ---- 2019-11-01 16:51:15 UTC - Kabeer Ahmed: @Matteo Merli - do you know of any connector that I can use from spark to produce? ---- 2019-11-01 16:51:27 UTC - Kabeer Ahmed: If you know any such connector and the options that I need to use, I am more than happy to use it ---- 2019-11-01 16:52:28 UTC - Kabeer Ahmed: I need to do a join in spark from multiple sources and then write into Pulsar ---- 2019-11-01 16:53:59 UTC - Kabeer Ahmed: On Wednesday I posted an issue related to stream-native github repo and devs here were quick to respod. ---- 2019-11-01 16:54:20 UTC - Kabeer Ahmed: I am hoping that someone will know and in the mean time I can dig into the code too to see if they have such option there. ---- 2019-11-01 16:54:28 UTC - Kabeer Ahmed: Thanks for any help. ---- 2019-11-01 17:12:24 UTC - Matt Mitchell: @Sijie Guo If I wanted to experiment with this idea, what would be the best way to create the client? Does Pulsar support DI such that I could have the client instance injected into my PushSource constructor? If not, how can I know what settings (pulsar host) to use? Would I need to just hard code that for now? ---- 2019-11-01 17:26:59 UTC - Matt Mitchell: Regarding `org.apache.pulsar.io.core.Source` - what controls the life cycle? I’m trying to understand when `open()` called, and what determines when processing is complete so that `close()` is called? ---- 2019-11-01 18:33:54 UTC - Simba Khadder: Is there a Pulsar sink to S3? ---- 2019-11-01 19:46:54 UTC - Kendall Magesh-Davis: Has anyone seen this before? New 2.4.1 cluster and bookkeepers are flopping with this error ```19:46:09.154 [main] ERROR org.apache.bookkeeper.bookie.Bookie - There are directories without a cookie, and this is neither a new environment, nor is storage expansion enabled. Empty directories are [data/bookkeeper/journal/current, data/bookkeeper/ledgers/current] ``` ---- 2019-11-01 19:47:22 UTC - Kendall Magesh-Davis: tried the usual manual “delete the ledger for the bad bookkeeper from ZK” strategy to no avail ---- 2019-11-01 19:53:38 UTC - Matt Mitchell: I have a PushSource connected via LocalRunner. I see Pulsar logging info about the “producer” (interestingly, not “source”) when it connects - While it is running, I do not see it in the output from `./bin/pulsar-admin sources list` - is this expected? ---- 2019-11-01 20:09:51 UTC - David Kjerrumgaard: There isn't an open-source one at the moment, but I have written one for a client in the past. Can you create an issue requesting one along with the specifics of how you want it to behave, e.g. how the bucket objects would be named, how often you would roll over the contents of the topic into a new S3 object, etc? +1 : Simba Khadder ---- 2019-11-01 21:33:02 UTC - Simba Khadder: will do thanks! ---- 2019-11-01 21:40:01 UTC - Alexandre DUVAL: @Sijie Guo do you know? ---- 2019-11-01 22:05:24 UTC - Jerry Peng: @Matt Mitchell that is expected if you are running the source via local run mode. If you submit the source to run in a cluster than it will be returned in that list command ---- 2019-11-01 22:07:30 UTC - Jerry Peng: open() is going to be the first method called relative to the source and close() is called when the source is stopped ---- 2019-11-01 22:20:32 UTC - Matt Mitchell: Got it, thanks @Jerry Peng. Is there anyway to have a connector (or even a function) registered, but running outside the cluster, and to also have the Pulsar API provide information on those external sources? ---- 2019-11-01 22:26:26 UTC - Matt Mitchell: The use case is that some data is only accessible in certain contexts/networks, and not directly accessible from inside of the cluster. So the connector would push to Pulsar from the outside, but we’d still want to “see” those connectors/functions via the admin API, or at least be aware of their connections somehow. ---- 2019-11-01 22:28:08 UTC - Jerry Peng: @Matt Mitchell If you running via local run mode, the meta data of a source/function/sink will not be registered in cluster. There are deployment modes in which you can still deploy functions/sinks/sources to a cluster but not be running on broker nodes. For example, you can deploy and function worker cluster somewhere else to just run sources/sinks/functions ---- 2019-11-01 22:29:07 UTC - Jerry Peng: Local run mode is really designed to run as a completely separate/isolated application ---- 2019-11-01 22:29:24 UTC - Jerry Peng: where management of the application is completely up to the user ---- 2019-11-01 22:32:16 UTC - Matt Mitchell: Got it, that makes sense now. So in local run mode, data can still flow back-n-forth between local and the cluster, does that go for function state as well? ---- 2019-11-01 22:33:07 UTC - Jerry Peng: Yes given network is configured correctly ---- 2019-11-01 22:33:35 UTC - Jerry Peng: allows the function/source/sinks to communicate with the pulsar cluster ---- 2019-11-01 22:33:45 UTC - Matt Mitchell: Ok great. Is there documentation on network requirements / recommended settings? ---- 2019-11-01 22:40:57 UTC - Jerry Peng: @Matt Mitchell There is some documentation here: <http://pulsar.apache.org/docs/en/next/functions-deploy/#local-run-mode> but its pretty basic. There is a list of all the arguments you can pass: <https://github.com/apache/pulsar/blob/master/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java#L116> make sure to configure --brokerServiceUrl and --stateStorageServiceUrl appropriately ---- 2019-11-01 22:41:41 UTC - Jerry Peng: Feel free to submit to PR to improve the documentation! We need the help! ---- 2019-11-01 22:45:07 UTC - Matt Mitchell: Will do, thanks! ---- 2019-11-01 23:26:52 UTC - MarinaM: @MarinaM has joined the channel ---- 2019-11-02 00:30:36 UTC - tuteng: @yijie ---- 2019-11-02 03:08:24 UTC - Ali Ahmed: <https://www.youtube.com/watch?v=5fqhT82wghY> ----
