2019-11-25 09:54:53 UTC - xue: Can Pulsar Shema be applied to Pulsar function?
If I have a student class and send it to topic through Avro schema, what should
I fill in the input generics of pulsar function? Like this?
Producer Code:
PulsarClient client =
PulsarClient.builder().serviceUrl(localClusterUrl).build();
Producer<Cdr> producer = client.newProducer(Schema.AVRO(Cdr.class))
.topic("<persistent://public/default-input>")
.producerName("test").create();
producer.send(cdr);
Pulsar function code:
public class FilterFunction1 implements Function<Student, Void> {
@Override
public Void process(Student student, Context context) throws Exception {
----
2019-11-25 09:57:48 UTC - Pedro Cardoso: When deploying your function you need
to specify the outputschematype or outputserdeclassname I think. I have not
been successful when I tried
----
2019-11-25 09:58:04 UTC - Pedro Cardoso: See this thread for more
<https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1574247160235800>
----
2019-11-25 09:59:48 UTC - xue: After deploying the fun, I had a problem
converting input types
----
2019-11-25 10:00:29 UTC - Pedro Cardoso: How was the function deployed?
----
2019-11-25 10:00:58 UTC - xue: bin/pulsar-admin functions create --jar
examples/filter-1.0-jar-with-dependencies.jar --classname
com.newland.filter.FilterFunction1 --tenant billing --namespace filter
--name filtercrd --inputs <persistent://billing/filter/filter-input>
--output <persistent://billing/rr/rr-input>
----
2019-11-25 10:02:06 UTC - xue: How to fill in this type?
----
2019-11-25 10:03:51 UTC - xue: Cdr class
----
2019-11-25 10:07:30 UTC - Pedro Cardoso: The Cdr class must be bundled with
jar, the pulsar function you want to deploy needs to have a dependency on the
`Cdr`class.
Also you should deploy the function with the additional parameter:
`--custom-schema-inputs`
----
2019-11-25 10:08:18 UTC - Pedro Cardoso: Never tried it but something like:
`--custom-schema-inputs {"``<persistent://billing/filter/filter-input>``":
"avro"}`
----
2019-11-25 11:00:12 UTC - Shishir Pandey: Will all publish() be at the same
time ?
----
2019-11-25 11:00:32 UTC - Shishir Pandey: i.e. within same "transaction" ?
----
2019-11-25 13:20:25 UTC - xue: When I submit the function, the type conversion
is wrong: genericrecord can not cast to Cdr. Before that, I registered the Cdr
Avro schema in topic.
----
2019-11-25 15:57:44 UTC - Christophe Bornet: Hi folks, is there a way to know
which consumer will consume a message given its key in a key-shared
subscription ?
----
2019-11-25 16:07:00 UTC - David Kjerrumgaard: @Christophe Bornet No, there
isn't any way to know which consumer will consume the message before hand. The
only guarantee you have is that messages with the same key will be consumed by
the same consumer in the group.
----
2019-11-25 16:08:02 UTC - Matteo Merli: In master (2.5) it's possible to
specify the hash ranges to consume
+1 : David Kjerrumgaard, Christophe Bornet
----
2019-11-25 16:12:43 UTC - Christophe Bornet: In the topic stats
(<http://pulsar.apache.org/admin-rest-api/?version=2.4.1#operation/getStats>)
we can have the list of consumers. Can we assume that they are in the same
order in the selector ?
----
2019-11-25 16:14:03 UTC - Matteo Merli: I believe so
----
2019-11-25 16:47:49 UTC - Christophe Bornet: Oh well, I just read the
HashRangeStickyKeyConsumerSelector algo to and I can't know the range just by
knowing the order as it also depends on the history of addition/removal of
consumers... Would it be possible to expose the range info in the topic stats ?
----
2019-11-25 17:36:38 UTC - Ryan Samo: Hey guys, any plans to add TLS support to
Pulsar Manager? The UI looks awesome but unfortunately I have my clusters
locked down via TLS auth
----
2019-11-25 17:38:27 UTC - Ryan Samo: Like being able to utilize the Admin certs
etc.
----
2019-11-25 18:04:45 UTC - Sijie Guo: I believe that has been added by @tuteng
or @Yuvaraj Loganathan recently. if not, can you create a github issue at
<https://github.com/apache/pulsar-manager>
----
2019-11-25 18:08:49 UTC - Sijie Guo: good suggestion. it should be pretty
straightforward to expose range info in the topic stats. would you mind filing
a github issue? so @Penghui Li can help with that.
----
2019-11-25 18:09:02 UTC - Christophe Bornet: To monitor the consumer lag, shall
numberOfEntriesSinceFirstNotAckedMessage (found in /broker-stats/topics) be
used ?
----
2019-11-25 18:10:42 UTC - Sijie Guo: @xue how did you register Cdr avro schema
in the topic?
----
2019-11-25 18:12:30 UTC - Christophe Bornet: Sure I'll open the issue
----
2019-11-25 18:15:46 UTC - Ryan Samo: Awesome thanks @Sijie Guo !
----
2019-11-25 18:17:06 UTC - Sijie Guo: topic_back_log is the one for topic level
lag , subscription_back_log is the one for subscription level lag
----
2019-11-25 18:22:53 UTC - Christophe Bornet: You mean msgBacklog in
/broker-stats/topics ?
----
2019-11-25 19:24:25 UTC - Alexandre DUVAL: Oh funny I just asked to tuteng by
DM :stuck_out_tongue:.
stuck_out_tongue_closed_eyes : Sijie Guo
----
2019-11-25 19:25:20 UTC - Ryan Samo: :+1:
----
2019-11-25 19:26:17 UTC - Alexandre DUVAL: Think t's in
BrokerStatsServiceImpl.java
----
2019-11-25 19:26:20 UTC - Alexandre DUVAL: ``` for (EnvironmentEntity
env : environmentEntities) {
String serviceUrl = checkServiceUrl(null, env.getBroker());
Map<String, Object> clusterObject =
clustersService.getClustersList(0, 0, serviceUrl, (c) ->
serviceUrl);
List<HashMap<String, Object>> clusterLists =
(List<HashMap<String, Object>>) clusterObject.get("data");
clusterLists.forEach((clusterMap) -> {
String cluster = (String) clusterMap.get("cluster");
Pair<String, String> envCluster = Pair.of(env.getName(),
cluster);
collectStatsServiceUrls.put(envCluster, (String)
clusterMap.get("serviceUrl"));
});
}```
----
2019-11-25 19:35:11 UTC - Endre Karlson: Is an overview of the transactions
feature somewhere ?
----
2019-11-25 19:42:17 UTC - Luke Lu: We noticed that with offloading enabled,
under moderate message rate (8k/s total across ~100 topics), brokers put large
amount of offloading related entries in zk txn logs, creating huge (> 20GB
vs the normal 64MiB) zk log segments, which could easily ran the disk space
out. Is this a known issue (seeing it in 2.4.1)?
----
2019-11-25 19:53:51 UTC - Sijie Guo:
<https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support>
----
2019-11-25 19:56:07 UTC - Sijie Guo: What kind of entries you see in zk txn
logs?
Offloaders don’t create new zk node. It just updating the existing zk nodes
used for ledger metadaa.
----
2019-11-25 20:20:29 UTC - Christophe Bornet: OK. It's indeed msgBacklog.
----
2019-11-25 21:22:13 UTC - Joshua Dunham: Hi Everyone,
----
2019-11-25 21:23:26 UTC - Joshua Dunham: Hoping to catch someone that uses
pulsar with NiFi. The recent connector builds introduce a RecordReader and
RecordWriter required property which is not behaving.
----
2019-11-25 21:33:51 UTC - David Kjerrumgaard: I wrote the NiFi processors for
NiFi, so hopefully I can help. What version of Pulsar and NiFi are you using?
----
2019-11-25 21:42:59 UTC - Joshua Dunham: 1.9.2 NIFI
----
2019-11-25 21:43:32 UTC - Joshua Dunham: 2.4.1 Pulsar
----
2019-11-25 21:47:11 UTC - Joshua Dunham: Apologies for the double post btw. I
also created an issue on the nifi-pulsar github repo. If we can get it working
I don't mind adding my knowledge to the issue in case it helps other folks.
----
2019-11-25 21:48:39 UTC - Joshua Dunham: It's the end of the day for me. So
I'll pop back into the channel tomorrow to see if you are free.
:slightly_smiling_face:
----
2019-11-25 22:08:18 UTC - Sijie Guo: yes msgBacklog
----
2019-11-25 22:08:49 UTC - Sijie Guo:
<https://github.com/apache/pulsar/issues/5560> this is the issue for supporting
key/value schema in presto sql
----
2019-11-25 23:47:45 UTC - Luke Lu: \/managed-ledgers/<topic-path> with
data size up to 800KiB with many duplicated offloading endpoint info (url,
bucket, region).
----
2019-11-26 00:06:43 UTC - Penghui Li: Thanks :+1:
----
2019-11-26 00:07:20 UTC - Sijie Guo: Can you show some more details about this?
----
2019-11-26 00:07:44 UTC - Yuri Mizushima: @Yuri Mizushima has joined the channel
----
2019-11-26 00:36:04 UTC - xue: bin/pulsar-admin schemas upload
<persistent://billing/filter/filter-input> -f connectors/cdr_avro_schema
----
2019-11-26 00:47:02 UTC - xue:
----
2019-11-26 00:48:45 UTC - xue:
----
2019-11-26 00:51:34 UTC - xue: @Sijie Guo I tested that even if I didn't
register Avro schema in topic and send Cdr class, the same conversion error
occurred
----
2019-11-26 00:55:02 UTC - Sijie Guo: Did you precreate the input topic?
----
2019-11-26 00:55:17 UTC - xue: yes
----
2019-11-26 00:56:31 UTC - xue:
----
2019-11-26 02:15:00 UTC - xue: I just tested it with JSON schema. It can be
used normally, but Avro schema can't。
----
2019-11-26 02:15:50 UTC - xue: producer code:
----
2019-11-26 03:05:02 UTC - Sijie Guo: how did you precreate the topic? Are you
using a client to produce messages using `Schema.AVRO(Crd.class)`?
----
2019-11-26 03:06:30 UTC - xue: create topic by pulsar manager,then use a client
to produce messages using `Schema.AVRO(Cdr.class)`
----
2019-11-26 06:11:59 UTC - Linmi: @Linmi has joined the channel
----
2019-11-26 09:03:07 UTC - Christophe Bornet: I need to know the current
consumer lag for a message published some time ago. The idea being estimating
how much time there is before a given message for which I know the publish time
will be consumed. Do you know how I could achieve this ?
----