2020-05-18 10:08:18 UTC - Pierre-Yves Lebecq: Hello :wave:
Quick question related to Pulsar functions state, and more specifically the
incrCounter method of the Context object. Does this method offer some kind of
atomicity? For example, if I run many instances of the same function, is there
a possibility of race conditions making the counter to be wrong at some point?
The API makes me think it does ensure atomicity, however I’m having a hard time
confirming it by looking at the pulsar or bookkeeper documentation.
eyes : Gilles Barbier
+1 : Frank Kelly
----
2020-05-18 10:12:08 UTC - JG: yes it doesnt work so how is possible to have JDK
11 functions working on Pulsar ? is there a way to use Pulsar in JDK 11 ?
----
2020-05-18 10:26:58 UTC - Kirill Merkushev: You could try
<https://github.com/bsideup/jabel>
----
2020-05-18 11:19:25 UTC - crtomir: @crtomir has joined the channel
----
2020-05-18 12:54:33 UTC - Andreas Müller: @Andreas Müller has joined the channel
----
2020-05-18 12:58:02 UTC - Andreas Müller: Hi, I'm looking for a fat jar of the
pulsar java client. There is one at maven central (pulsar-client-all) but it
lacks classes like `org.apache.pulsar.client.api.PulsarClient`. Any hints? I
need the fat jar. Currently the only way to use the client is to add all jar
files from the `lib` directory which is 100+ MB... Thanks, Andreas
----
2020-05-18 14:53:40 UTC - VanderChen: Hello, everyone! I have a question about
*Key Shared Mode*. In my system, Pulsar is running in *standalone* mode with
*Docker*.
`docker run -it -p 6650:6650 -p 8080:8080 --mount
source=pulsardata,target=/pulsar/data --mount
source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.5.1 bin/pulsar
standalone`
When I use *keySharedPolicy* to specify a key for the consumer, this consumer
will still receive all the messages.
For example, sending messages with keys "key-1" "key-2", consumer for key-1
will receive both key-1 message and key-2 message.
I don't know the problem is in my code or my system setting.
The demo code is as follows,
*Producer code*
```public class PulsarProducer {
private static PulsarClient client;
private static Producer<byte[]> producer;
public static void main(String[] args) throws Exception {
client = PulsarClient.builder()
.serviceUrl("<pulsar://localhost:6650>")
.build();
producer = client.newProducer()
.topic("my-topic")
.create();
startProducer();
}
private static void startProducer() throws Exception {
while (true){
System.out.println("Key Shared Message round!");
producer.newMessage().key("key-1").value("message-1-1\n".getBytes()).send();
producer.newMessage().key("key-1").value("message-1-2\n".getBytes()).send();
producer.newMessage().key("key-1").value("message-1-3\n".getBytes()).send();
producer.newMessage().key("key-2").value("message-2-1\n".getBytes()).send();
producer.newMessage().key("key-2").value("message-2-2\n".getBytes()).send();
producer.newMessage().key("key-2").value("message-2-3\n".getBytes()).send();
producer.newMessage().key("key-3").value("message-3-1\n".getBytes()).send();
producer.newMessage().key("key-3").value("message-3-2\n".getBytes()).send();
producer.newMessage().key("key-4").value("message-4-1\n".getBytes()).send();
producer.newMessage().key("key-4").value("message-4-2\n".getBytes()).send();
Thread.sleep(1000);
}
}
}```
*The Consumer code*
```public class PulsarConsumers {
private static PulsarClient client;
private static Consumer<byte[]> consumer;
public static void main(String[] args) throws Exception {
client = PulsarClient.builder()
.serviceUrl("<pulsar://localhost:6650>")
.build();
int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;
consumer = client.newConsumer()
.topic("my-topic")
.ackTimeout(30, TimeUnit.SECONDS)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hashcode,
hashcode)))
.subscribe();
startConsumer();
}
private static void startConsumer() throws PulsarClientException {
while (true) {
// Wait for a message
Message<byte[]> msg = consumer.receive();
try {
System.out.printf("Message received: %s", new
String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
System.err.printf("Unable to consume message: %s",
e.getMessage());
consumer.negativeAcknowledge(msg);
}
}
}
}```
----
2020-05-18 14:54:50 UTC - Matteo Merli: It just depends on `pulsar-client-api`
too. This was done to ensure you can get Javadoc, and source code explore in
IDEs
----
2020-05-18 16:05:29 UTC - Andreas Müller: Unfortunately it has more
dependencies like protobuf. I’m using client-all and pulsar-api and still
getting this:
----
2020-05-18 16:14:08 UTC - Andreas Müller: I had to add `org.lz4-lz4-java-1.5.0`
and `org.apache.pulsar-protobuf-shaded-2.1.0-incubating` plus the 2 above.
Actually not what I would call a shaded jar. Everything should be included in
`client-all`.
----
2020-05-18 17:04:09 UTC - Sijie Guo: Currently the state was a “global” state.
The state is hosted at bookkeeper side. It ensures all the updates are
sequenced into a log before mutating the state. So it prevent race conditions.
----
2020-05-18 17:06:06 UTC - Sijie Guo: You are computing a different hashcode?
```int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;```
----
2020-05-18 17:08:47 UTC - Sijie Guo: @JG we have to build and release binaries
using JDK11.
----
2020-05-18 17:09:32 UTC - Pierre-Yves Lebecq: Thanks for the reply. I
understand from pulsar point of view it does, however I assume multiple
instances of the same function can create some race conditions. For example, if
I create counters using putState / getState instead of using counters, and two
instances do the following at the same time (pseudo code):
counter = getState(“my.counter.key”) // we assume current state is 42
putState(“my.counter.key”, counter + 1) // will result in 43 instead of 44
Am I correct with this?
----
2020-05-18 17:18:27 UTC - JG: @Sijie Guo do you think its possible to make a
JDK11 release for Pulsar ? Because JDK11 is gaining in popularity and the is
the GraalVM to improve performences as well...
----
2020-05-18 17:19:56 UTC - Sijie Guo: Yes. Will include JDK11 release for the
upcoming 2.6.0 release.
+1 : David Kjerrumgaard
----
2020-05-18 18:34:09 UTC - Rounak Jaggi: How can we enable tls certs in
zookeeper?
----
2020-05-18 18:35:44 UTC - Matteo Merli: For LZ4 there are problems in including
and shading it (eg: the relocated C libraries are not found anymore).
The protobuf stuff should be included.
----
2020-05-18 19:37:13 UTC - Alan Broddle: Zookeeper TLS Question: So far, we
have used the java command line inputs to pass in the TLS related information
to zookeeper. We are concerned about the current password related parameter.
> -Dzookeeper.ssl.trustStore.*password*=<_*passwordhere*_>“.
Given that it is a bad practice to show visible passwords in running processes
on a server, what is the variable to include in the password file location to
the security variables?
Is there something like: ???
> -Dzookeeper.ssl.trustStore.password_*<FilePath>*_=<Path>
----
2020-05-18 20:46:30 UTC - Sijie Guo:
<https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+SSL+User+Guide>
----
2020-05-18 20:47:04 UTC - Sijie Guo: @Enrico Olivelli who is one zookeeper PMC
member. He can probably help with this question.
----
2020-05-18 20:56:28 UTC - Ryan Van Antwerp: @Ryan Van Antwerp has joined the
channel
----
2020-05-18 21:33:37 UTC - JG: ok nice !!!
----
2020-05-18 23:36:26 UTC - VanderChen: I'm going to specify a key for the
consumer. For example, there are messages with key-1 and key-2. I want to
construct a consumer only receive message with key-1. But I don't know how to
achieve this.
I use this to find the relationship between String key and its hashcode. And
specify the key's hashcode when construct the consumer. But it looks like wrong
way now.
```int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;```
----
2020-05-19 00:44:27 UTC - Rounak Jaggi: This is using keystore and truststore.
Is there a way to use PEM for ZK TLS?
----
2020-05-19 01:11:56 UTC - Tymm: Hi, I am using pulsar 2.4.2/ 2.5.1 standalone
and having problem where pulsar functions that uses state will get stuck/ stop
working after 10k+ successful operations (sometimes more at < 100k).... I
have to restart pulsar standalone/ delete and add the functions again to make
it run for the next 10k+ times.... any thoughts?
----
2020-05-19 01:42:30 UTC - Penghui Li: Can you try to use
`Murmur3_32Hash.getInstance().makeHash("key-1") % 65536` for consumer-1 and use
`Murmur3_32Hash.getInstance().makeHash("key-2") % 65536` for consumer-2?
----
2020-05-19 01:48:19 UTC - Penghui Li: Can this be reproduced steadily? Could
you please help create an issue on Github and it’s better provide steps to
reproduce, thanks.
----
2020-05-19 02:35:27 UTC - Alexandre DUVAL: What is the best ways to get metrics
per namespace?
----
2020-05-19 02:47:38 UTC - Luke Stephenson: @Luke Stephenson has joined the
channel
----
2020-05-19 02:55:04 UTC - Luke Stephenson: Hello. I've recently spun up a
pulsar cluster on k8s using the helm templates. After creating a producer
which published messages to a persistent topic, the brokers have stopped
starting with the following error:
`ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM
exception: failed to allocate 16777216 byte(s) of direct memory (used:
268435456, max: 268435456)`
I doubled the memory allocated to the brokers but now it just fails with a
similar message reflecting more memory used:
`ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM
exception: failed to allocate 16777216 byte(s) of direct memory (used:
536870912, max: 536870912)`
I've seen a bug report which may fix this issue
<https://github.com/apache/pulsar/pull/6634>, but that is not available yet.
I'm assuming the brokers should work with the default amount of memory
suggested in the helm templates (perhaps not optimal caching, but should be
stable).
Any suggestions?
Thanks.
----
2020-05-19 02:56:54 UTC - Sam Xie: @Sam Xie has joined the channel
----
2020-05-19 03:54:12 UTC - VanderChen: I have tried this, but is not work.
Consumer 1 will receive all message
----
2020-05-19 04:22:39 UTC - ckdarby: Isn't this exposed in Prometheus? I see
metrics in Grafana for cluster, namespace and topics.
----
2020-05-19 04:24:06 UTC - Ruian: I would recommend you try replacing all values
of `BOOKIE_MEM` and `PULSAR_MEM` env to `" -XX:+UseContainerSupport
-XX:InitialRAMPercentage=40.0 -XX:MinRAMPercentage=20.0
-XX:MaxRAMPercentage=80.0 "` and setting `resource memory limits` to 500M in
the k8s container spec.
----
2020-05-19 05:39:23 UTC - Ken Huang: Hi, I use configurationStore to deploy two
clusters. I found if I enable functionsWoker then only first broker of the
cluster can work and the second broker of the cluster will get error
----
2020-05-19 05:39:23 UTC - Ken Huang: ```02:15:38.662
[ForkJoinPool.commonPool-worker-0] WARN
org.apache.pulsar.broker.web.PulsarWebResource - Namespace missing local
cluster name in clusters list: local_cluster=pulsar-14 ns=public/functions
clusters=[pulsar-145]
02:15:38.675 [pulsar-web-41-1] INFO org.eclipse.jetty.server.RequestLog -
10.244.102.110 - - [18/May/2020:02:15:38 +0000] "PUT
/admin/v2/persistent/public/functions/assignments HTTP/1.1" 412 60 "-"
"Pulsar-Java-v2.5.1" 118
02:15:38.689 [AsyncHttpClient-52-1] WARN
org.apache.pulsar.client.admin.internal.BaseResource -
[<http://pulsar-14-broker-0.pulsar-14-broker.pulsar.svc.cluster.local:8080/admin/v2/persistent/public/functions/assignments>]
Failed to perform http put request:
<http://javax.ws.rs|javax.ws.rs>.ClientErrorException: HTTP 412 Precondition
Failed
02:15:38.698 [main] ERROR org.apache.pulsar.functions.worker.WorkerService -
Error Starting up in worker
org.apache.pulsar.client.admin.PulsarAdminException$PreconditionFailedException:
Namespace does not have any clusters configured
at
org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:220)
~[org.apache.pulsar-pulsar-client-admin-original-2.5.1.jar:2.5.1]
at
org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)
~[org.apache.pulsar-pulsar-client-admin-original-2.5.1.jar:2.5.1]
at
org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030)
~[org.glassfish.jersey.core-jersey-client-2.27.jar:?]
at
org.glassfish.jersey.client.JerseyInvocation$4.completed(JerseyInvocation.java:1017)
~
... 47 more```
----
2020-05-19 05:39:23 UTC - Ken Huang:
p.s pulsar-14 and pulsar-145 is my cluster name
----
2020-05-19 06:31:51 UTC - Patrik Kleindl: Hi, some general questions, as both
Pulsar and Bookkeeper require Zookeeper, is it a common practice to use the
same ZK cluster for both or better to keep it separated?
I was just wondering how many instances of ZK would be needed, as other systems
usually require 3-5 ZK instances.
How resource-intensive is ZK regarding Pulsar and Bookkeeper?
----
2020-05-19 06:31:54 UTC - Andreas Müller: Filed a bug:
<https://github.com/apache/pulsar/issues/6982>
----
2020-05-19 06:32:35 UTC - Rattanjot Singh: What configurations are available to
keep the connection alive when a connection is idle for long time? Connection
gets closed at 60 seconds and new connection is used by producer automatically.
Setting up
<http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientBuilder.html#keepAliveInterval-int-java.util.concurrent.TimeUnit-|keepAliveInterval>
on client builder is not helping in keeping the connection alive.
----
2020-05-19 06:50:01 UTC - Luke Stephenson: We will give this a go @Ruian
----
2020-05-19 06:50:52 UTC - Luke Stephenson: But regardless of the memory
allocated, it shouldn't OOM right, just work less efficiently. Do you have any
knowledge around this?
----
2020-05-19 06:52:34 UTC - Luke Stephenson: The pulsar docs mention the
following about compacted topics
(<https://pulsar.apache.org/docs/en/cookbooks-compaction/#when-should-i-use-compacted-topics>):
• They can read from the "original," non-compacted topic in case they need
access to "historical" values, i.e. the entirety of the topic's messages.
• They can read from the compacted topic if they only want to see the most
up-to-date messages.
What if I'm not interested in the entirety of the original topics values and
just want the compacted version? Is it possible to have a retention policy
which only clears up the "original" topic to prevent it from growing. We will
only have consumers of the compacted topic, so it would be very wasteful to
continue to store the complete history of messages on the original topic.
Also, when we publish messages that can be compacted, they typically need to
have the full copy of the latest state (as we are saying it's valid to skip
over intermediate states). So the intention of the producers is to publish
messages which will be compacted, not consumed individually from the beginning
of time.
----
2020-05-19 07:42:39 UTC - Sijie Guo: @VanderChen at the producer side, you need
to use keyBasedBatcher.
----
2020-05-19 07:42:53 UTC - Sijie Guo: otherwise, the messages are batched into
one large batch.
----
2020-05-19 07:43:02 UTC - Sijie Guo: so the consumer 1 will receive all the
messages.
----
2020-05-19 07:45:10 UTC - Sijie Guo: @Luke Stephenson Currently if you are
producing much faster than the bookies can support, you might be experiencing
OOM. In this case, you can set maxPendingRequests at the bookie side or enable
rate throttling at the broker side.
----
2020-05-19 07:47:10 UTC - Sijie Guo: You need to manually set the replication
clusters for the functions namespace. Otherwise one cluster is not able to
startup. There is a Github issue for tracking an improvement for this.
----
2020-05-19 07:47:58 UTC - Sijie Guo: You can use the same zookeeper instance.
3~5 nodes is a very typical setup.
----
2020-05-19 07:49:44 UTC - Sijie Guo: In pulsar’s wire protocol, there is also
ping/pong message to keep alive the connection. You connection shouldn’t be
close every 60 seconds. Do you have more information about this issue?
----
2020-05-19 07:51:45 UTC - Sijie Guo: > Is it possible to have a retention
policy which only clears up the “original” topic to prevent it from growing
The original data will be deleted/reclaimed based on its retention policy.
----
2020-05-19 07:59:43 UTC - Deepa: @Deepa has joined the channel
----
2020-05-19 08:07:41 UTC - Deepa: I used below code in deug mode, and waited at `
```System.out.println("wait!!!!");```
for more than 60 seconds and proceeded.
----
2020-05-19 08:07:58 UTC - Deepa: ```public static void main (String[] args)
throws PulsarClientException, InterruptedException {
PulsarClient pclient =
PulsarClient.builder().keepAliveInterval(3000,TimeUnit.SECONDS)
.serviceUrl("<pulsar://localhost:6650>").build();
Producer<byte[]> producer = pclient.newProducer()
.topic("my-keepalive-topic")
.create();
for(int i=0;i<2;i++){
String msg = "asdfasdfad";
producer.newMessage().value(msg.getBytes()).send();
System.out.println("Produced Message: "+msg);
}
ProducerImpl<byte[]> test= (ProducerImpl<byte[]>) producer;
System.out.println(test.getConnectionId());
System.out.println("wait!!!!");
for(int i=0;i<2;i++){
String msg = "qerqewrq";
producer.newMessage().value(msg.getBytes()).send();
System.out.println("Produced Message: "+msg);
}
ProducerImpl<byte[]> test2= (ProducerImpl<byte[]>) producer;
System.out.println(test2.getConnectionId());
producer.close();
pclient.close();
}```
----
2020-05-19 08:09:02 UTC - Deepa: Output:
```Produced Message: asdfasdfad
Produced Message: asdfasdfad
[id: 0x2f333067, L:/127.0.0.1:50045 - R:localhost/127.0.0.1:6650]
wait!!!!
Produced Message: qerqewrq
Produced Message: qerqewrq
[id: 0x5588b1e0, L:/127.0.0.1:50053 - R:localhost/127.0.0.1:6650]```
ConnectionId "127.0.0.1:50045 " is closed after 60 seconds and second iteration
of produce messages used a different connenction id "127.0.0.1:50053"
----
2020-05-19 08:11:53 UTC - Deepa: Few log lines from pulsar standalone console
which correlate with above output
```13:35:04.903 [pulsar-io-50-12] INFO
org.apache.pulsar.broker.service.ServerCnx - New connection from
/127.0.0.1:50045
13:35:05.272 [ForkJoinPool.commonPool-worker-6] INFO
org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50045] Created new
producer:
Producer{topic=PersistentTopic{topic=<persistent://public/default/my-keepalive-topic>},
client=/127.0.0.1:50045, producerName=standalone-5-3, producerId=0}
13:36:04.904 [pulsar-io-50-12] WARN
org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0xf0a6deef,
L:/127.0.0.1:6650 - R:/127.0.0.1:50045]] Forcing connection to close after
keep-alive timeout
13:36:04.904 [pulsar-io-50-12] INFO org.apache.pulsar.broker.service.ServerCnx
- Closed connection from /127.0.0.1:50045
13:36:16.038 [pulsar-io-50-15] INFO org.apache.pulsar.broker.service.ServerCnx
- New connection from /127.0.0.1:50053
13:36:16.043 [ForkJoinPool.commonPool-worker-2] INFO
org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50053] Created new
producer:
Producer{topic=PersistentTopic{topic=<persistent://public/default/my-keepalive-topic>},
client=/127.0.0.1:50053, producerName=standalone-5-3, producerId=0}```
----
2020-05-19 08:21:54 UTC - Deepa: Is is because of the debug mode, the ping/pong
message to keep alive is also paused and the connections are getting closed?
----
2020-05-19 08:27:50 UTC - Deepa: Is there an explicit option to set to keep an
connection alive for mentioned time?
----
2020-05-19 08:45:44 UTC - Olivier Chicha: Hi, I am facing some issues to run my
integration tests inside eclipse
those tests work fine using maven but I can't get them to run as junit inside
eclipse:
I am getting this exception:
> Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodError:
'org.apache.pulsar.common.schema.SchemaInfo
org.apache.pulsar.common.schema.SchemaInfo.setName(java.lang.String)'
> at
org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:46)
> at
org.apache.pulsar.client.internal.DefaultImplementation.newBytesSchema(DefaultImplementation.java:136)
> at org.apache.pulsar.client.api.Schema.<clinit>(Schema.java:149)
> ... 38 more
> Caused by: java.lang.NoSuchMethodError:
'org.apache.pulsar.common.schema.SchemaInfo
org.apache.pulsar.common.schema.SchemaInfo.setName(java.lang.String)'
> at
org.apache.pulsar.client.impl.schema.BytesSchema.<clinit>(BytesSchema.java:35)
> at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
> at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at java.base/java.lang.Class.newInstance(Class.java:584)
> at
org.apache.pulsar.client.internal.DefaultImplementation.lambda$10(DefaultImplementation.java:138)
> at
org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35)
> ... 40 more
looks like it is related to the fact setname in SchemaInfo is declared via a
lombok annotation on the name attribute.
I have installed the longbok pluggin on my eclipse but it still does not work.
any idea on how to solve this?
----