2019-11-25 09:54:53 UTC - xue: Can Pulsar Shema be applied to Pulsar function? 
If I have a student class and send it to topic through Avro schema, what should 
I fill in the input generics of pulsar function? Like this?

Producer Code:

PulsarClient client = 
PulsarClient.builder().serviceUrl(localClusterUrl).build();

Producer<Cdr> producer = client.newProducer(Schema.AVRO(Cdr.class))

.topic("<persistent://public/default-input>")

.producerName("test").create();

producer.send(cdr);



Pulsar function code:

public class FilterFunction1 implements Function&lt;Student, Void&gt; {

@Override

public Void process(Student student, Context context) throws Exception {
----
2019-11-25 09:57:48 UTC - Pedro Cardoso: When deploying your function you need 
to specify the outputschematype or outputserdeclassname I think. I have not 
been successful when I tried
----
2019-11-25 09:58:04 UTC - Pedro Cardoso: See this thread for more 
<https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1574247160235800>
----
2019-11-25 09:59:48 UTC - xue: After deploying the fun, I had a problem 
converting input types
----
2019-11-25 10:00:29 UTC - Pedro Cardoso: How was the function deployed?
----
2019-11-25 10:00:58 UTC - xue: bin/pulsar-admin functions create   --jar 
examples/filter-1.0-jar-with-dependencies.jar   --classname 
com.newland.filter.FilterFunction1   --tenant billing   --namespace filter   
--name filtercrd   --inputs <persistent://billing/filter/filter-input>   
--output <persistent://billing/rr/rr-input>
----
2019-11-25 10:02:06 UTC - xue: How to fill in this type?
----
2019-11-25 10:03:51 UTC - xue: Cdr class
----
2019-11-25 10:07:30 UTC - Pedro Cardoso: The Cdr class must be bundled with 
jar, the pulsar function you want to deploy needs to have a dependency on the 
`Cdr`class.
Also you should deploy the function with the additional parameter: 
`--custom-schema-inputs`
----
2019-11-25 10:08:18 UTC - Pedro Cardoso: Never tried it but something like: 
`--custom-schema-inputs {"``<persistent://billing/filter/filter-input>``": 
"avro"}`
----
2019-11-25 11:00:12 UTC - Shishir Pandey: Will all publish() be at the same 
time ?
----
2019-11-25 11:00:32 UTC - Shishir Pandey: i.e. within same "transaction" ?
----
2019-11-25 13:20:25 UTC - xue: When I submit the function, the type conversion 
is wrong: genericrecord can not cast to Cdr. Before that, I registered the Cdr 
Avro schema in topic.
----
2019-11-25 15:57:44 UTC - Christophe Bornet: Hi folks, is there a way to know 
which consumer will consume a message given its key in a key-shared 
subscription ?
----
2019-11-25 16:07:00 UTC - David Kjerrumgaard: @Christophe Bornet No, there 
isn't any way to know which consumer will consume the message before hand. The 
only guarantee you have is that messages with the same key will be consumed by 
the same consumer in the group.
----
2019-11-25 16:08:02 UTC - Matteo Merli: In master (2.5) it's possible to 
specify the hash ranges to consume
+1 : David Kjerrumgaard, Christophe Bornet
----
2019-11-25 16:12:43 UTC - Christophe Bornet: In the topic stats 
(<http://pulsar.apache.org/admin-rest-api/?version=2.4.1#operation/getStats>) 
we can have the list of consumers. Can we assume that they are in the same 
order in the selector ?
----
2019-11-25 16:14:03 UTC - Matteo Merli: I believe so
----
2019-11-25 16:47:49 UTC - Christophe Bornet: Oh well, I just read the 
HashRangeStickyKeyConsumerSelector algo to and I can't know the range just by 
knowing the order as it also depends on the history of addition/removal of 
consumers... Would it be possible to expose the range info in the topic stats ?
----
2019-11-25 17:36:38 UTC - Ryan Samo: Hey guys, any plans to add TLS support to 
Pulsar Manager? The UI looks awesome but unfortunately I have my clusters 
locked down via TLS auth 
----
2019-11-25 17:38:27 UTC - Ryan Samo: Like being able to utilize the Admin certs 
etc.
----
2019-11-25 18:04:45 UTC - Sijie Guo: I believe that has been added by @tuteng 
or @Yuvaraj Loganathan recently. if not, can you create a github issue at 
<https://github.com/apache/pulsar-manager>
----
2019-11-25 18:08:49 UTC - Sijie Guo: good suggestion. it should be pretty 
straightforward to expose range info in the topic stats. would you mind filing 
a github issue? so @Penghui Li can help with that.
----
2019-11-25 18:09:02 UTC - Christophe Bornet: To monitor the consumer lag, shall 
numberOfEntriesSinceFirstNotAckedMessage (found in /broker-stats/topics) be 
used ?
----
2019-11-25 18:10:42 UTC - Sijie Guo: @xue how did you register Cdr avro schema 
in the topic?
----
2019-11-25 18:12:30 UTC - Christophe Bornet: Sure I'll open the issue
----
2019-11-25 18:15:46 UTC - Ryan Samo: Awesome thanks @Sijie Guo !
----
2019-11-25 18:17:06 UTC - Sijie Guo: topic_back_log is the one for topic level 
lag , subscription_back_log is the one for subscription level  lag
----
2019-11-25 18:22:53 UTC - Christophe Bornet: You mean msgBacklog in 
/broker-stats/topics ?
----
2019-11-25 19:24:25 UTC - Alexandre DUVAL: Oh funny I just asked to tuteng by 
DM :stuck_out_tongue:.
stuck_out_tongue_closed_eyes : Sijie Guo
----
2019-11-25 19:25:20 UTC - Ryan Samo: :+1:
----
2019-11-25 19:26:17 UTC - Alexandre DUVAL: Think t's in 
BrokerStatsServiceImpl.java
----
2019-11-25 19:26:20 UTC - Alexandre DUVAL: ```        for (EnvironmentEntity 
env : environmentEntities) {
            String serviceUrl = checkServiceUrl(null, env.getBroker());
            Map&lt;String, Object&gt; clusterObject =
                clustersService.getClustersList(0, 0, serviceUrl, (c) -&gt; 
serviceUrl);
            List&lt;HashMap&lt;String, Object&gt;&gt; clusterLists = 
(List&lt;HashMap&lt;String, Object&gt;&gt;) clusterObject.get("data");
            clusterLists.forEach((clusterMap) -&gt; {
                String cluster = (String) clusterMap.get("cluster");
                Pair&lt;String, String&gt; envCluster = Pair.of(env.getName(), 
cluster);
                collectStatsServiceUrls.put(envCluster, (String) 
clusterMap.get("serviceUrl"));
            });
        }```
----
2019-11-25 19:35:11 UTC - Endre Karlson: Is an overview of the transactions 
feature somewhere ?
----
2019-11-25 19:42:17 UTC - Luke Lu: We noticed that with offloading enabled, 
under moderate message rate (8k/s total across ~100 topics), brokers put large 
amount of offloading related entries in zk txn logs, creating huge (&gt; 20GB 
vs the normal 64MiB) zk log segments, which could easily ran the disk space 
out. Is this a known issue (seeing it in 2.4.1)?
----
2019-11-25 19:53:51 UTC - Sijie Guo: 
<https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support>
----
2019-11-25 19:56:07 UTC - Sijie Guo: What kind of entries you see in zk txn 
logs?

Offloaders don’t create new zk node. It just updating the existing zk nodes 
used for ledger metadaa.
----
2019-11-25 20:20:29 UTC - Christophe Bornet: OK. It's indeed msgBacklog.
----
2019-11-25 21:22:13 UTC - Joshua Dunham: Hi Everyone,
----
2019-11-25 21:23:26 UTC - Joshua Dunham: Hoping to catch someone that uses 
pulsar with NiFi. The recent connector builds introduce a RecordReader and 
RecordWriter required property which is not behaving.
----
2019-11-25 21:33:51 UTC - David Kjerrumgaard: I wrote the NiFi processors for 
NiFi, so hopefully I can help. What version of Pulsar and NiFi are you using?
----
2019-11-25 21:42:59 UTC - Joshua Dunham: 1.9.2 NIFI
----
2019-11-25 21:43:32 UTC - Joshua Dunham: 2.4.1 Pulsar
----
2019-11-25 21:47:11 UTC - Joshua Dunham: Apologies for the double post btw. I 
also created an issue on the nifi-pulsar github repo. If we can get it working 
I don't mind adding my knowledge to the issue in case it helps other folks.
----
2019-11-25 21:48:39 UTC - Joshua Dunham: It's the end of the day for me. So 
I'll pop back into the channel tomorrow to see if you are free. 
:slightly_smiling_face:
----
2019-11-25 22:08:18 UTC - Sijie Guo: yes msgBacklog
----
2019-11-25 22:08:49 UTC - Sijie Guo: 
<https://github.com/apache/pulsar/issues/5560> this is the issue for supporting 
key/value schema in presto sql
----
2019-11-25 23:47:45 UTC - Luke Lu: \/managed-ledgers/&lt;topic-path&gt; with 
data size up to 800KiB with many duplicated offloading endpoint info (url, 
bucket, region).
----
2019-11-26 00:06:43 UTC - Penghui Li: Thanks :+1:
----
2019-11-26 00:07:20 UTC - Sijie Guo: Can you show some more details about this?
----
2019-11-26 00:07:44 UTC - Yuri Mizushima: @Yuri Mizushima has joined the channel
----
2019-11-26 00:36:04 UTC - xue: bin/pulsar-admin schemas upload 
<persistent://billing/filter/filter-input> -f connectors/cdr_avro_schema
----
2019-11-26 00:47:02 UTC - xue: 
----
2019-11-26 00:48:45 UTC - xue: 
----
2019-11-26 00:51:34 UTC - xue: @Sijie Guo I tested that even if I didn't 
register Avro schema in topic and send Cdr class, the same conversion error 
occurred
----
2019-11-26 00:55:02 UTC - Sijie Guo: Did you precreate the input topic?
----
2019-11-26 00:55:17 UTC - xue: yes
----
2019-11-26 00:56:31 UTC - xue: 
----
2019-11-26 02:15:00 UTC - xue: I just tested it with JSON schema. It can be 
used normally, but Avro schema can't。
----
2019-11-26 02:15:50 UTC - xue: producer code:
----
2019-11-26 03:05:02 UTC - Sijie Guo: how did you precreate the topic? Are you 
using a client to produce messages using `Schema.AVRO(Crd.class)`?
----
2019-11-26 03:06:30 UTC - xue: create topic by pulsar manager,then use a client 
to produce messages using `Schema.AVRO(Cdr.class)`
----
2019-11-26 06:11:59 UTC - Linmi: @Linmi has joined the channel
----
2019-11-26 09:03:07 UTC - Christophe Bornet: I need to know the current 
consumer lag for a message published some time ago. The idea being estimating 
how much time there is before a given message for which I know the publish time 
will be consumed. Do you know how I could achieve this ?
----

Reply via email to