2018-02-07 06:51:10 UTC - Jaebin Yoon: I've been trying to hit the maximum 
throughput of brokers and bookies so I setup 5 bookies (d2.4xlarge), 20 brokers 
(m4.2xlarge) and pushed up to 800MBps throughput (upto 200MBps for some 
bookies) and it started to break.. I think it hit the bookie write throughput 
so the brokers got slowed down and eventually the producers got hit with the 
out of direct memory error.  (I'm using kafka producer api).   
It seems that the producer configurations for queue limit (maxPendingMessages, 
maxPendingMessageAcrossPartitions) are not exposed to kafka compat api.

```Caused by: 
org.apache.pulsar.shade.io.netty.util.internal.OutOfDirectMemoryError: failed 
to allocate 16777216 byte(s) of direct memory (used: 1073741824, max: 
1073741824)
        at 
org.apache.pulsar.shade.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
        at 
org.apache.pulsar.shade.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
                                                                                
                                                                                
6075,2-9      0%
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
        at 
org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
        at 
org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
        at 
org.apache.pulsar.shade.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
        at 
org.apache.pulsar.shade.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:121)
        at 
org.apache.pulsar.client.impl.BatchMessageContainer.add(BatchMessageContainer.java:91)
        at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:305)
        ... 12 more```
----
2018-02-07 06:54:57 UTC - Jaebin Yoon: So given the number, I would plan for 
bookie capacity as ~180MBps per bookie. Is this reasonable?  (the current 
bookie setup : journal is on the ramdisk, ledgers are on RAID0 over 12 HDDs)
----
2018-02-07 07:46:00 UTC - Jaebin Yoon: Regarding grafana pulsar dashboard, I 
was really confused with numbers for "Local publish rate", "Local delivery 
rate", "Local publish throughput",  "local delivery throughput" with 1.22 since 
prometheus metrics added per topic metrics. I had to modify prometheus query 
for those graphs (add topic="".  i.e ).. for example, for "Local publish rate" 
```sum(pulsar_rate_in{cluster=~"$cluster", namespace=~"$namespace",topic=""}) 
by (cluster, namespace)```
----
2018-02-07 07:48:38 UTC - Matteo Merli: you can still get the per-namespace 
aggregated by setting `exposeTopicLevelMetricsInPrometheus=false`
+1 : Jaebin Yoon
----
2018-02-07 07:49:30 UTC - Matteo Merli: Also, the confusion might be that t was 
reporting the rate twice. once per the topic and the other per namespace
----
2018-02-07 07:49:43 UTC - Matteo Merli: That got fixed today in master
----
2018-02-07 07:50:03 UTC - Matteo Merli: (with the latencies missing on the 
per-topic stats)
----
2018-02-07 07:50:04 UTC - Jaebin Yoon: Yes. it was the case. Good to know it 
got fixed.
----
2018-02-07 07:50:57 UTC - Matteo Merli: I changed to have either per-namespace 
or per-topic, so even if you have per-topic you can still get the aggregation 
done in Prometheus
----
2018-02-07 07:51:40 UTC - Jaebin Yoon: To calculate capacity, it seems we need 
to use the max write throughput for bookies and network bandwidth for brokers 
in my setup. 180MBps for bookie.. is this reasonable?
----
2018-02-07 07:51:43 UTC - Matteo Merli: In case the amount of stats becomes too 
big, one can switch to per-namespace aggregation directly in PUlsar
----
2018-02-07 07:52:19 UTC - Matteo Merli: It seems low, considering the journal 
on RAM disk
----
2018-02-07 07:52:51 UTC - Jaebin Yoon: I think it can go 200MBps.. but that's 
max it seems
----
2018-02-07 07:53:07 UTC - Matteo Merli: are these Megabits or MegaBytes ?
----
2018-02-07 07:53:16 UTC - Jaebin Yoon: MegaBytes
----
2018-02-07 07:53:21 UTC - Matteo Merli: ah ok
----
2018-02-07 07:53:22 UTC - Matteo Merli: :slightly_smiling_face:
----
2018-02-07 07:53:44 UTC - Matteo Merli: I’m always confused
----
2018-02-07 07:53:49 UTC - Jaebin Yoon: hehe yeah..
----
2018-02-07 07:53:57 UTC - Matteo Merli: Yes, ~200 MB/s is really the ballpark
----
2018-02-07 07:54:20 UTC - Jaebin Yoon: Yup. it seems to be the case for my 
setup.
----
2018-02-07 07:54:51 UTC - Matteo Merli: (in the past we also played with using 
multiple “journals” in the bookie)
----
2018-02-07 07:55:45 UTC - Matteo Merli: that might give more throughput (or 
might not).. Essentially is having multiple threads (eg: 4) for the journal and 
hash each ledger to a journal
----
2018-02-07 07:55:49 UTC - Jaebin Yoon: I see.. yeah I can see that can multiple 
the throughput..if they go to the separate HD.
----
2018-02-07 07:56:28 UTC - Matteo Merli: Yes, and even on a single SSD, it can 
take advantage of parallel queues in the SSD controllers
----
2018-02-07 07:56:34 UTC - Jaebin Yoon: with the cost of complexity..
----
2018-02-07 07:56:40 UTC - Jaebin Yoon: i see.
----
2018-02-07 07:56:55 UTC - Jaebin Yoon: ok. i just want to make sure what i'm 
getting is reasonable.
----
2018-02-07 07:57:01 UTC - Matteo Merli: Yes
+1 : Jaebin Yoon, Ali Ahmed
----
2018-02-07 10:21:45 UTC - bsideup: @bsideup has joined the channel
----
2018-02-07 10:25:10 UTC - bsideup: Hi! Anyone with Event Sourcing + Pulsar 
experience? :slightly_smiling_face:
----
2018-02-07 10:27:00 UTC - bsideup: Parameters:
1) partition by key (i.e. userId) => preserve the order
2) infinite retention period => make it possible to re-process the events 
from the very beginning
3) persistent ACKs
----
2018-02-07 10:30:58 UTC - bsideup: Also, does Pulsar solve the Kafka’s issue 
with connectivity and NAT? i.e. can I put Pulsar behind a Load Balancer without 
having consumers to connect to each and every broker in some internal network? 
:slightly_smiling_face:
----
2018-02-07 10:34:56 UTC - Ivan Kelly: Yes to all. the final one is solved by 
the pulsar proxy
----
2018-02-07 10:38:06 UTC - bsideup: nice :slightly_smiling_face:  Does Pulsar 
Proxy have some benchmarks already? No surprise the performance will not be the 
same as native protocol, but would be nice to know the penalty
----
2018-02-07 10:38:40 UTC - Ivan Kelly: i don't think we have one I'm afraid
----
2018-02-07 10:40:26 UTC - bsideup: no problem! Are there any examples / 
tutorials of pulsar-proxy?
----
2018-02-07 11:57:10 UTC - Sijie Guo: @bsideup: there should be no code 
difference when talking to proxy vs talking to broker directly. when you 
connect to proxy, the response to tell the client it is a proxy, and the client 
would act in the proxy mode. for how to run proxy, 
<https://pulsar.incubator.apache.org/docs/latest/admin/Proxy/> there is a 
simple page. I think @Luc Perkins is working on some tutorial or instructions 
on running proxies and setting up load balancer on cloud environment (but I am 
not sure). @Luc Perkins can probably chime in more.
----
2018-02-07 11:57:55 UTC - bsideup: cool, thanks :slightly_smiling_face:
----
2018-02-07 16:42:56 UTC - Matteo Merli: &gt; 2) infinite retention period =&gt; 
make it possible to re-process the events from the very beginning

Retention time can be set to “infinite” starting from next (imminent) release 
<http://pulsar.apache.org/docs/latest/advanced/RetentionExpiry/>
----
2018-02-07 16:48:10 UTC - Matteo Merli: @bsideup Regarding the proxy there is a 
bit more of info here: 
<https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Pulsarproxy-2ud51l>
 

I did some perf testing a few months back with the proxy and I didn’t see any 
noticeable perf difference in normal conditions (when no cpu/network bottleneck 
were present). 

The Pulsar proxy uses the same binary protocol as brokers. After an initial 
handshake between client and broker, it degrades itself to a mere TCP proxy, 
copying buffer from one socket to the other, without parsing or deserializing 
anything. It’s very light both in CPU and memory usage.
----
2018-02-07 16:49:35 UTC - bsideup: &gt; I didn’t see any noticeable perf 
difference 

Whoa! That’s super cool actually :slightly_smiling_face:  Gives a hope that one 
day proxy might become a recommended way to connect to Pulsar until you have a 
need to use low-level broker-aware connectivity :slightly_smiling_face:
----
2018-02-07 16:50:22 UTC - Matteo Merli: Yes, the only disadvantage is the extra 
network hop, and increase in network bandwidth :slightly_smiling_face:
----
2018-02-07 16:51:57 UTC - bsideup: if you look at what people do with Kafka 
because broker awareness is super hard to deal with (i.e. http-based Nakadi 
from Zalando, Hermes from Allegro, etc), proxy a huge win for Pulsar :smile:
----
2018-02-07 16:52:20 UTC - bsideup: Am I right that it can be easily scaled 
horizontally?
----
2018-02-07 16:52:46 UTC - Matteo Merli: yes, the only point of proxy is that is 
truly stateless
----
2018-02-07 16:52:53 UTC - bsideup: :+1:
----
2018-02-07 16:53:35 UTC - bsideup: We recently introduced Kafka support in 
<https://www.testcontainers.org> and it was super painful, really looking 
forward adding an official support for Pulsar in TC with pulsar-proxy :+1:
+1 : Matteo Merli
----
2018-02-07 16:54:07 UTC - Matteo Merli: brokers have no “durable” state, but 
still they “own” a particular topic at a given point in time, so (without the 
proxy) a client needs to directly connect to right broker.
----
2018-02-07 16:54:48 UTC - bsideup: BTW if Pulsar is started in “standalone” 
mode, will it redirect consumers to some internal IP, or it will reuse the same 
connection consumer opened to reach it?
----
2018-02-07 16:56:12 UTC - bsideup: Simple scenario:

If I start an instance in “standalone” mode with Docker:
```
docker run -p 12345:6650 pulsar
```

And connect from my machine via `<pulsar://localhost:12345>` - will it work?
----
2018-02-07 16:56:41 UTC - bsideup: Kafka fails at it and consumer re-connects 
to the advertised host, even if cluster has a single node
----
2018-02-07 16:57:10 UTC - Matteo Merli: We had some debate on whether the 
“standalone” mode should be acting in the same way as the proxy. That’s 
something that we should do to simplify using the standalone from inside a 
container. 

Currently, it works in the same way as the regular broker, redirecting the 
client to the advertised address. 

You can specify the advertised address though. :
----
2018-02-07 16:57:40 UTC - Matteo Merli: ```
docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  -v $PWD/data:/pulsar/data \
  apachepulsar/pulsar:1.21.0-incubating \
  bin/pulsar standalone --advertised-address 127.0.0.1
```
----
2018-02-07 16:57:48 UTC - bsideup: Out of curiosity, why does it redirect?
----
2018-02-07 16:57:54 UTC - bsideup: I mean, the consumer already connected to it
----
2018-02-07 16:58:13 UTC - Matteo Merli: It’s just the same code used in the 
multi-node deployment
----
2018-02-07 16:58:24 UTC - bsideup: and if parition’s owner == currently 
connected node, isn’t it possible to continue without doing the handshake?
----
2018-02-07 16:58:27 UTC - bsideup: ah, ok
----
2018-02-07 16:59:04 UTC - bsideup: But anyway, pulsar-proxy will make it super 
easy to deal with, especially for testing where sub-ms performance is not 
critical
----
2018-02-07 17:00:49 UTC - Matteo Merli: Yes, so for standalone, it doesn’t 
really need to use the proxy, it will just need to set the “proxy-flag” in the 
initial handshake to force the client to reconnect (well, get the connection 
from pool) to the same “service-url” it used for the initial connection.
----
2018-02-07 17:01:51 UTC - bsideup: yeah
----
2018-02-07 17:02:17 UTC - bsideup: it might be useful for small deployments 
behind NAT as well
----
2018-02-07 17:05:51 UTC - bsideup: super happy so far with the design decisions 
and implementation of Pulsar, it looks way more mature compared to Kafka :+1:
slightly_smiling_face : Matteo Merli
----
2018-02-07 17:07:47 UTC - bsideup: I wish somebody would implement 
kafka-to-pulsar-proxy, so that we can point our fleet of microservices to 
Pulsar without changing their code (yet)
----
2018-02-07 17:09:34 UTC - Matteo Merli: I know it’s not the same, but there’s 
Kafka API wrapper: 
<http://pulsar.apache.org/docs/latest/adaptors/KafkaWrapper/> Also, Landoop 
folks they have a Pulsar &lt;-&gt; Kafka connector 
<http://www.landoop.com/blog/2018/01/stream-reactor-kafka-connectors-04/>
----
2018-02-07 17:13:38 UTC - bsideup: yeah, already checked the wrapper 
:slightly_smiling_face:
----
2018-02-07 20:19:44 UTC - bsideup: P.S. how performant WebSocket client is? I 
know about Base64 encoded payload, which obviously adds to the latency, but 
anything else except that?
----
2018-02-07 20:26:50 UTC - bsideup: P.S.S. I plan to create an AWS 
CloudFormation template for Pulsar, anyone who can guide be about the nuances 
of Pulsar in AWS?
----
2018-02-07 20:34:22 UTC - bsideup: P.S.S.S. how does Pulsar connects to other 
brokers? Does it use the same advertised host it sends to the clients (hello 
Kafka &gt;_&lt; ) or not?
----
2018-02-07 20:37:13 UTC - bsideup: sorry for so many questions at once :smile: 
Probably Slack topics will be a good fit for them
----
2018-02-07 20:37:25 UTC - Matteo Merli: &gt; P.S. how performant WebSocket 
client is? I know about Base64 encoded payload, which obviously adds to the 
latency, but anything else except that?

I think we did some perf-tests back in the day and yes, between the JSON and 
the base64. It stills allows for pipelining between client and WebSocket 
handler (and brokers underneath). The ballbark if I don’t remember wrongly was 
in the 20K msg/s (that was with small messages)
----
2018-02-07 20:39:24 UTC - Matteo Merli: &gt; P.S.S. I plan to create an AWS 
CloudFormation template for Pulsar, anyone who can guide be about the nuances 
of Pulsar in AWS? 

Nice! The deploy scripts with Terraform+Ansible are documented at 
<http://pulsar.apache.org/docs/latest/deployment/aws-cluster/> 

That should be a start. Otherwise ask here and someone will be surely able to 
help out.
----
2018-02-07 20:43:04 UTC - Matteo Merli: &gt; P.S.S.S. how does Pulsar connects 
to other brokers? Does it use the same advertised host it sends to the clients 
(hello Kafka &gt;_&lt; ) or not?

Yes, brokers acquire ownership of topics and client library will internally 
discover the right broker, by first connecting to the serviceUrl for the 
cluster, the nominal endpoint, which can be anything like VIP, DNS, K8S 
discovery. After the initial lookup, clients are directly connecting to the 
right broker, at its own advertised address. That address is `hostname` by 
default, and can by overridden in the `broker.conf`
----
2018-02-07 20:49:34 UTC - bsideup: But that’s about the clients, right? I was 
more curious about broker-to-broker communication, how they discover each other
----
2018-02-07 20:49:59 UTC - Matteo Merli: Oh, in same cluster, they share the 
same ZK
----
2018-02-07 20:50:32 UTC - bsideup: the problem with Kafka is that if you set 
“advertised host” to something like EC2's public IP (because you want to 
connect to your cluster from the outside) then the brokers will use it to 
communicate with each other
----
2018-02-07 20:50:53 UTC - bsideup: So I was wondering if it’s the same in 
Pulsar or not
----
2018-02-07 20:51:07 UTC - Matteo Merli: I see, so the in-cluster replication 
works a bit differently
----
2018-02-07 20:52:07 UTC - Matteo Merli: brokers are just passing the data to 
“bookies” (storage nodes, Apache BookKeeper). A broker doens’t have to talk to 
other brokers when publishing/delivering a message
----
2018-02-07 20:52:47 UTC - Matteo Merli: it will just append it to the 
BookKeeper log, which consist in sending the message in parallel to N bookies 
and waiting for the acks
----
2018-02-07 20:53:39 UTC - bsideup: ah, ok, cool :slightly_smiling_face:  So 
“advertised host” will never break a cluster communication (like it is in 
Kafka)? Sorry for so many refs to Kafka, but it’s our current solution with a 
ton of PITA and I thought it will serve a good example :slightly_smiling_face:
----
2018-02-07 20:53:49 UTC - Matteo Merli: (finally, bookies also use 
“advertisedAddress” trick but they don’t need to be shared out of the 
boundaries of the cluster)
----
2018-02-07 20:54:16 UTC - Matteo Merli: no problem! all great questions
----
2018-02-07 20:56:08 UTC - bsideup: And even greater answers :wink:  Really 
looking forward to try Pulsar :slightly_smiling_face:
----
2018-02-07 21:01:50 UTC - bsideup: is there an official / semi-official Docker 
image for Pulsar?
----
2018-02-07 21:05:33 UTC - Matteo Merli: yes, 
<https://hub.docker.com/r/apachepulsar/pulsar/>
----
2018-02-07 21:06:23 UTC - Matteo Merli: built from this Dockerfile: 
<https://github.com/apache/incubator-pulsar/blob/master/docker/Dockerfile>
----
2018-02-07 21:07:08 UTC - bsideup: :heart_eyes:
----
2018-02-07 21:07:44 UTC - bsideup: Super, thanks! :slightly_smiling_face:  Will 
try it tomorrow :+1:
----
2018-02-07 21:08:36 UTC - Matteo Merli: It contains the standard bin 
distribution plus some scripts to update the config file based on env variables
----
2018-02-07 23:19:27 UTC - Matteo Merli: @Jaebin Yoon I’m still tracking down 
yesterday’s issue. I think the biggest issue starts when using “batching” with 
the 10K messages over multiple partitions… and when the network becomes the 
bottleneck. At that point the producer starts using a disproportionate amount 
of memory. I think I’m getting closer to a fix :slightly_smiling_face: . 
Otherwise a quick workaround is to disable batching in producer. I’ve seen that 
behaving as expected in the above conditions.
----
2018-02-07 23:21:27 UTC - Matteo Merli: In any case, the gain from batching is 
not big when spraying across many partitions. Even in your capture, most 
batches were 1-2 messages already.
----
2018-02-07 23:43:57 UTC - Kevin Pierce: @Kevin Pierce has joined the channel
----
2018-02-08 00:59:06 UTC - Matteo Merli: I’m really having lot of fun with 1Gbps 
broker NICs
----
2018-02-08 01:00:01 UTC - Matteo Merli: :slightly_smiling_face:
----
2018-02-08 01:15:59 UTC - Jaebin Yoon: @Matteo Merli Yeah tell me about it. 
Load balancing becomes very interesting with it. I'm doing load-balancing  with 
the script to push up to the limit. Thanks for the update on the issue.
----
2018-02-08 01:22:52 UTC - Matteo Merli: So for the memory side, the reason is 
that we default to 128Kb buffers to accumulate batches before sending. (128Kb 
is the max batch size, so the intention here is to avoid reallocations on that 
buffer as it grows). 

The problem starts with (batching + “many” partitions + slow responses from 
broker, due to network limit or reloads). At that point, the batches are still 
being prepared “on-time”, eg: every 1ms. With many paritions, the per-partition 
rate is comparatively low, so we end cutting a batch per each message and 
queuing them for sending. But each message now is taking 128Kb direct memory, 
instead of 10K or 1K.. 

The obvious fix here is to make the 128Kb to adjust to max observed batch size 
(or 99pct of it).
----
2018-02-08 01:24:30 UTC - Matteo Merli: For the consumer error, I’ve been able 
to reproduce a very similar one, when doing lots of unload while publishing 
128Kb messages to saturate NIC. Again, I verified it only happens when batching 
is enabled.
----

Reply via email to