2018-06-07 13:50:26 UTC - Chris Miller: @Chris Miller has joined the channel
2018-06-07 23:28:28 UTC - Ali Ahmed: intellij error
Error:(11153, 56) java: package 
org.apache.pulsar.shaded.com.google.protobuf.v241 does not exist
2018-06-07 23:38:02 UTC - Matteo Merli: tried to do `mvn install` from top 
level dir?
2018-06-07 23:42:43 UTC - Ali Ahmed: it’s in intellij only
2018-06-07 23:53:22 UTC - Igor Zubchenok: Guys, just a comment about all your 
I'm looking forward for a fix of my memory leaks and bookkeeper issues. 
By the way I'm so much impressed, you're great, and I love Pulsar solution!
sunglasses : Matteo Merli, Ali Ahmed, Sijie Guo
+1 : jia zhai, Ali Ahmed, Sijie Guo, Sanjeev Kulkarni
2018-06-07 23:54:23 UTC - Matteo Merli: Thanks @Igor Zubchenok!
star-struck : Igor Zubchenok, Ali Ahmed, Sijie Guo
2018-06-08 00:49:04 UTC - Byron: Good episode @Karthik Ramasamy and @Matteo 
+1 : Sijie Guo, jia zhai
2018-06-08 06:18:18 UTC - Idan: Hi guys, I was wondering if you have an 
‘embedded’ version of pulsar for java app for our unit-testings
2018-06-08 06:19:52 UTC - Ali Ahmed: hi Idan, there is an issue open for it and 
I plan to spend some on it , you can use also use this instead
2018-06-08 06:20:16 UTC - Ali Ahmed: it’s an alternative to do the same thing 
with containers
2018-06-08 06:20:45 UTC - Ali Ahmed: the issue request is here
2018-06-08 06:21:00 UTC - Ali Ahmed: I plan to have something ready by next week
2018-06-08 06:22:33 UTC - Idan: unforutnally we working on ec-2 classic with 1 
java process.. i afraid if our developers will import the project they’ll need 
to install pulsar to let the tests go by
2018-06-08 06:24:17 UTC - Ali Ahmed: embedded jvm process will be better then , 
give me a day or two to prototype it
2018-06-08 06:25:31 UTC - Idan: great. as soon as you got something i can be 
ready to test it
2018-06-08 06:25:34 UTC - Ali Ahmed: ok
2018-06-08 06:26:11 UTC - Idan: another reason is to have it embedded is to 
avoid the build-server to install any pre-configurations like jenkins on the 
building phase
2018-06-08 06:31:59 UTC - Matteo Merli: @Idan It is already possible to do 
that, though it’s not a 1 liner (as it should be). 

We use it in context of unit tests as well and there are different options 
 * Broker with mocked BK and ZK — This just stores the messages in memory 
 * Broker with real BK and ZK — This one is very similar to Pulsar standalone 

We have plan to make this easier for people to use in their tests.
2018-06-08 06:36:02 UTC - Idan: @Matteo Merli cool. thanks. iam in the middle 
of developmen of replacing sqs into pulsar (POC’ing)
2018-06-08 06:36:23 UTC - Idan: still need to figure out how to impl the 
retry-counter and the DLQ forwarder
2018-06-08 06:38:08 UTC - Idan: btw: we used also our own dedup mechanism using 
redis. I assume now I can use pulsar’s mechanism
2018-06-08 06:39:15 UTC - Idan: actually it depends. with SQS everytime  their 
client sent message twice (network issues, etc..) the ID was re-generated(which 
wasnt enough for us as we took as a new message). I was wondering if with 
PULSAR’s dedup mechanism I can provide the dedup key decider.
2018-06-08 06:43:43 UTC - Sijie Guo: @Matteo Merli do we publish test-jar for 
pulsar-broker? if we haven’t, we can publish the test-jar to make users easier 
to do testing
2018-06-08 06:44:40 UTC - Matteo Merli: Pulsar deduping is based on sequence 
ids. each producer has its own sequence of messages and Pulsar keeps track of 
last message published by each producer (in a guaranteed way). 

I have a slide deck with details on the implementation: 
(By the way, I’ll be presenting this at next ApacheCon in September)
2018-06-08 06:44:52 UTC - Matteo Merli: (there was also a blog post on the 
subject: <https://streaml.io/blog/pulsar-effectively-once/>)
2018-06-08 06:45:34 UTC - Idan: thanks for info ill take a look at it. but 
still it doesnt really help you with dedups on application level
2018-06-08 06:45:37 UTC - Idan: ill explain why
2018-06-08 06:47:50 UTC - Idan: assume we have two services. producer and 
consumer. both using pulsar java client (as an example) if the producer send 
message to the broker but never got ack back from it (assume the broker did 
send an ack but the response never got to the producer(network issue,etc..) so 
that might cause the broker to go on and deliver the message but the producer 
never got the ack. meaning the producer will do a re-try right? on the consumer 
side you expected to get 2 diff messages from the broker. the first one (which 
was never acked by the producer because of network issue,etc.. and the second 
one which the producer re-sent again
2018-06-08 06:48:01 UTC - Idan: so we have 2 diff messages which are actually 
should be 1
2018-06-08 06:48:04 UTC - Matteo Merli: Yes, we do have those: 

Problem is the current classes are not very friendly when used outside Pulsar 

Ideally, we should have something like
PulsarStandalone x = new PulsarStandalone();

That people can easily integrate with.
2018-06-08 06:49:10 UTC - Matteo Merli: That’s correct. When enabling dedup, 
Pulsar will be able to discard the duplicates in this case
2018-06-08 06:49:33 UTC - Idan: That’s correct. When enabling dedup, Pulsar 
will be able to discard the duplicates in this case -&gt; not sure I get that
2018-06-08 06:49:43 UTC - Idan: will it be able to dedup this case or not?
2018-06-08 06:49:46 UTC - Matteo Merli: yes
2018-06-08 06:50:00 UTC - Idan: but how.. if you producer re-send the message 
it will use new sequenceId wouldnt it?
2018-06-08 06:50:22 UTC - Idan: the producer “thinks” the broker never got the 
message while the broker really did get the message
2018-06-08 06:50:26 UTC - Matteo Merli: no, client library will use the same 
sequence id when resending the message
2018-06-08 06:50:42 UTC - Idan: so on the broker level it will be dedup?
2018-06-08 06:51:06 UTC - Matteo Merli: yes, if dedup is enabled on the 
namespace (default is off)
2018-06-08 06:51:54 UTC - Idan: ok so in this case we are good. but we are 
still not 100% protected from Idempotence. guess thats less an issue on your 
side. if someone still send message twice (assume everything is cool pulsar)
2018-06-08 06:52:00 UTC - Idan: we using our own eventId for Idempotence
2018-06-08 06:52:10 UTC - Idan: if we could use it on pulsar as our dedup key
2018-06-08 06:52:13 UTC - Idan: that would be perfect
2018-06-08 06:52:45 UTC - Idan: but again.. didnt expect PULSAR to support it
2018-06-08 06:53:21 UTC - Idan: it just mean we still need to remain our redis 
for dedup based on our eventId’s to avoid applicative error’s (like request 
being send twice with the same internal eventId)
2018-06-08 06:53:25 UTC - Matteo Merli: yes, by default, each producer in 
Pulsar gets assigned a generated “producer-name” which is globally unique. 

each message gets published with a sequenceId that starts from 0 and keeps 
2018-06-08 06:54:06 UTC - Sijie Guo: I see. let’s create a task and improve it
2018-06-08 06:54:23 UTC - Matteo Merli: Applications can override the producer 
name. For example you might want to use same identifier for a producer when it 
comes back after a crash, so that dedup can work across multiple session.
2018-06-08 06:55:42 UTC - Matteo Merli: At the same time, you can use an 
application defined “sequence-id” for each message. The only requriments are : 
 * seqeuence-id is long 
 * it needs to be monotonically increasing
 * there can be “holes” in the sequence
2018-06-08 06:56:19 UTC - Matteo Merli: This is key to tie the messages to some 
property of the application.
2018-06-08 06:56:40 UTC - Idan: I understand
2018-06-08 06:56:42 UTC - Idan: If our app is producing messages to diff 
queues. we need to initiate diff producer or we can use the same one ? 
            producer = client.newProducer(new StringSchema())
                    .batchingMaxPublishDelay(3, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
2018-06-08 06:57:10 UTC - Idan: what would be recommended
2018-06-08 06:57:22 UTC - Idan: app=same service/same java process
2018-06-08 06:57:50 UTC - Matteo Merli: A producer can only publish on a single 
topic, so you would need 1 producer per queue
2018-06-08 06:58:08 UTC - Idan: ok.. so our app need to maintain this
2018-06-08 06:59:23 UTC - Idan: About consumers. your examples show we need to 
use the infinite-loop tyle. meaning we need to wrap it with diff thread. Do you 
support thread-pool or we shall imp the consumers thread pool our self (e.g 
using executors etc..)
2018-06-08 07:00:58 UTC - Matteo Merli: When consuming from multiple topics, 
the easiest approach is to use the `MessageListener`
2018-06-08 07:01:27 UTC - Idan: @Idan uploaded a file: 
2018-06-08 07:01:30 UTC - Matteo Merli: client library already has a thread 
pool for the message listener
2018-06-08 07:02:22 UTC - Idan: ahh.. because on your example I saw only the 
while (true) which lead me to think that I shall wrap it within a thread incase 
I want multiple consumer threads (to increase throughput)
2018-06-08 07:03:05 UTC - Idan: * wasnt talking about multiple topics but 
multiple thread consumers to increase throughout (read more messages from diff 
threads from the same topic)
2018-06-08 07:03:06 UTC - Matteo Merli: One example is here: 
2018-06-08 07:03:52 UTC - Matteo Merli: You can increase the number of threads 
for the listener executor with 
2018-06-08 07:04:07 UTC - Idan: ok cool. exactly what I was looking for
2018-06-08 07:04:09 UTC - Idan: great stuff
2018-06-08 07:04:21 UTC - Idan: with the bloody SQS i had to invent the whole 
thing myself
2018-06-08 07:05:04 UTC - Matteo Merli: &gt;* wasnt talking about multiple 
topics but multiple thread consumers to increase throughout (read more messages 
from diff threads from the same topic)

Oh, I think that’s a different issue then. If you want to have multiple threads 
receiving from same topic, you can just have all of them calling 
`consumer.receive()` on the same consumer instance
2018-06-08 07:05:47 UTC - Idan: so I do need to implement thread pool and 
2018-06-08 07:05:49 UTC - Idan: right?
2018-06-08 07:06:00 UTC - Idan: while each thread will consumer.receive();
2018-06-08 07:06:25 UTC - Matteo Merli: yes, the listener is guaranteeing the 
order, so messages from same topic are routed to same thread
2018-06-08 07:06:59 UTC - Idan: @Idan uploaded a file: 
2018-06-08 07:07:47 UTC - Matteo Merli: correct, even if you increase the 
listener threads, from a single topic all messages will be dispatched in a 
single thread
2018-06-08 07:08:05 UTC - Matteo Merli: you can jump through an executor at 
that point though..
2018-06-08 07:08:10 UTC - Idan: ok ok got it. ill just use my own 
exectutorThread pool
2018-06-08 07:08:39 UTC - Idan: perhaps after I finish ill submit an example so 
others can use as it’s pretty generic. couldnt think about an app that consume 
only from one thread for the same topic
2018-06-08 07:09:09 UTC - Idan: (when we talking about concurrency processing )
2018-06-08 07:09:45 UTC - Matteo Merli: yes, unless you want to retain order, 
then you would have to use 1 thread  :slightly_smiling_face:
2018-06-08 07:10:08 UTC - Idan: the only thing that left for me in this case is 
to Singleton the pulsar consumer class  so our app can ACK the message when it 
finish to
2018-06-08 07:10:12 UTC - Idan: process it
2018-06-08 07:10:31 UTC - Idan: but if we have multiple consumers from diff 
topics we will need to put it in a map or something so we use the right 
consumer for acking
2018-06-08 07:11:14 UTC - Idan: would be cool if I could ack using HTTP.. it 
would be pretty much dynamic within the app and wont need to pass that consumer 
instance (or use it as a singleton service)
2018-06-08 07:11:18 UTC - Matteo Merli: in `MessageListener` example you have 
always the message and the consumer from where it came from
2018-06-08 07:11:53 UTC - Idan: yes I understand.. but again I cant use it as 
we need multiple consumers for the same topic:(
2018-06-08 07:12:50 UTC - Idan: @Matteo Merli thanks for your kind support. ill 
go impl some stuff now
+1 : Matteo Merli
2018-06-08 07:13:59 UTC - Matteo Merli: &gt; yes I understand.. but again I 
cant use it as we need multiple consumers for the same topic:(

I would still use that and pass the pair (consumer, message) to your processing 
thread pool
2018-06-08 07:16:21 UTC - Idan: ok not sure I get it (yet) how to impl that 
message listener within thread pool to enable multiple consumers for the same 
topic and also using the ACK with it
2018-06-08 07:17:08 UTC - Idan: as our ack is not executed right away.. we 
consume that message using the threadpool. we move it to another thread.. doing 
some process and on some point we ack it
2018-06-08 07:17:26 UTC - Idan: (from diff thread.. not the same thread the 
message was consumed)
2018-06-08 07:18:04 UTC - Idan: since we use vertx we are actually have 
dedicated threadPOOL which consume messages and each thread offload the message 
to the eventLoop thread (and freeing up that consumer thread to get new 
messages). at some point on the eventLoop thread we need to ack the message
2018-06-08 07:18:30 UTC - Idan: it’s getting abit complicated but we did the 
same with sqs. only diff is that with sqs we used HTTP to ack messages so it 
was abit easier
2018-06-08 07:18:52 UTC - Matteo Merli: @Matteo Merli uploaded a file: 
2018-06-08 07:19:04 UTC - Matteo Merli: (absolutely untested snippet)
2018-06-08 07:19:54 UTC - Idan: ok i get it. but what if iam offloading this 
message to another thread
2018-06-08 07:20:09 UTC - Idan: so the executor thread instance can fastly get 
back to consume more messages
2018-06-08 07:21:09 UTC - Matteo Merli: the listener will keep pushing until 
the `submit()` calls block
2018-06-08 07:21:39 UTC - Idan: my idea was to handle a hashmap with all 
consumers it will be accessaible everywhere in my app so I can ack using the 
same consumer everywhere (from diff thread)
2018-06-08 07:22:34 UTC - Idan: ill be able to get the consumer easly by 
topicAddress or something
2018-06-08 07:22:49 UTC - Idan: an uglier way is to pass the consumer instance 
all over the place.. but thats just cosmetics
2018-06-08 07:23:27 UTC - Matteo Merli: yes, pass it along with the message to 
the processing thread
2018-06-08 07:24:35 UTC - Idan: wouldnt it be uglier? think about methods need 
to be signature with the pulsarConsumer type all over the place to pass it along
2018-06-08 07:24:36 UTC - Idan: :stuck_out_tongue_winking_eye:
2018-06-08 07:25:40 UTC - Idan: but I got you anyway. i can use that Listener 
aswell. ill give it a shot
2018-06-08 07:26:04 UTC - Matteo Merli: you can always wrap it in an `Acker` 
interface or similar
2018-06-08 07:26:14 UTC - Matteo Merli: :slightly_smiling_face:
2018-06-08 07:26:29 UTC - Idan: exactly..
2018-06-08 07:26:53 UTC - Idan: passing topicAddress and the messageId/Message
2018-06-08 07:27:30 UTC - Idan: i also better use diff consumer instance for 
each topic so I can fine tune
2018-06-08 07:27:37 UTC - Idan: the number of consumerThreads
2018-06-08 07:27:40 UTC - Idan: for each topic
2018-06-08 07:27:46 UTC - Idan: some of them requires more resources then others
2018-06-08 07:32:00 UTC - Matteo Merli: &gt; i also better use diff consumer 
instance for each topic so I can fine tune (edited)

On consumer side, you have the option to create 1 single logical consumer that 
consumes from multiple topics
2018-06-08 07:33:21 UTC - Idan: true. but if I want to allocate diff threadPool 
for diff topic ill need multiple consumer instances
2018-06-08 07:33:42 UTC - Idan: for example we have one topic on our real time 
flow that need to have 400 threads to allow it consume high throughput
2018-06-08 07:34:02 UTC - Idan: and we have another topic that is out of the 
real-time flow that get messages once in a while that shall be configured with 
10 thread consumers
2018-06-08 07:38:39 UTC - Idan: make sense eh?
2018-06-08 07:47:08 UTC - Ali Ahmed: I am getting this error

        at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:326)
2018-06-08 07:47:40 UTC - Ali Ahmed: I have provided google guava ver 21.0
2018-06-08 07:48:10 UTC - Sijie Guo: @Idan  - make sense to me. (I think Matteo 
probably offline now)
2018-06-08 07:48:35 UTC - Idan: @Sijie Guo thanks.
2018-06-08 07:49:10 UTC - Sijie Guo: @Ali Ahmed what dependency are you using? 
I bet you have different versions of guava.
2018-06-08 07:50:05 UTC - Ali Ahmed: I am only depending on pulsar broker jar
2018-06-08 07:50:56 UTC - Sijie Guo: if you are only using pulsar broker, you 
don’t need to provide additional dependency.
2018-06-08 07:51:33 UTC - Sijie Guo: run dependency:tree if you are using maven
2018-06-08 07:51:34 UTC - Ali Ahmed: it’s needed apparently
2018-06-08 07:51:44 UTC - Ali Ahmed: I am not using maven
2018-06-08 07:52:26 UTC - Sijie Guo: I am not sure why do you need to specify 
that. guava is the transitive dependency of the broker module.
2018-06-08 07:52:42 UTC - Sijie Guo: or run gradle’s command to analyze your 
2018-06-08 07:54:09 UTC - Idan: @Sijie Guo if iam using the same producer-name 
on multiple producers instances (sending to diff topic names). that can cause 
2018-06-08 07:54:34 UTC - Idan: @Idan uploaded a file: 
2018-06-08 07:54:48 UTC - Ali Ahmed: @Ali Ahmed uploaded a file: 
 and commented: @Sijie Guo
2018-06-08 07:58:52 UTC - Sijie Guo: @Idan producerName is used for identifying 
publisher, and achieve exactly-once publishing. so you can’t use same producer 
name for different producer instances for sending messages to same topic. that 
says “producer name” should be sort of unique per topic. 

you don’t have to set that and let pulsar manages the producer names for you, 
if you don’t any specific naming requirements.
2018-06-08 08:00:08 UTC - Sijie Guo: @Ali Ahmed: com.google.guava:guava:20.0 
-&gt; 21.0
2018-06-08 08:00:16 UTC - Sijie Guo: you are forcing upgrade to 21?
2018-06-08 08:00:18 UTC - Idan: @Sijie Guo great
2018-06-08 08:00:40 UTC - Ali Ahmed: I am not forcing anything
2018-06-08 08:01:34 UTC - Sijie Guo: yes you are including 21, no? pulsar 2.0.0 
uses 20
2018-06-08 08:02:09 UTC - Ali Ahmed: I removed 21.0 from the build file
2018-06-08 08:02:49 UTC - Sijie Guo: if you are seeing “x -&gt; y” on each 
dependency, this is upgrading a dependency from original version to a new 
2018-06-08 08:03:15 UTC - Sijie Guo: @Ali Ahmed are you using any framework 
which will depend on 21?
2018-06-08 08:03:33 UTC - Ali Ahmed: no this is causing 21
|    |    +--- org.apache.bookkeeper:circe-checksum:4.7.0
|    |    |    +--- com.google.guava:guava:21.0
2018-06-08 08:04:05 UTC - Ali Ahmed: this causes all other references to update 
2018-06-08 08:04:34 UTC - Sijie Guo: okay.. I am not sure how gradle handles 
this now.
2018-06-08 08:05:01 UTC - Sijie Guo: I believe in pulsar, we set the root 
dependency to 20, so it will not include transitive dependency from other 
2018-06-08 08:05:10 UTC - Sijie Guo: I am not sure how gradle can do that
2018-06-08 08:05:48 UTC - Sijie Guo: you can try to set guava to 20 in your 
gradle project, or use pulsar-broker-shaded (which already shade the 
dependencies), or use maven
2018-06-08 08:06:59 UTC - Ali Ahmed: okay let me try
2018-06-08 08:45:54 UTC - Idan: @Sijie Guo is it possible to define to the java 
producer client thread pool via your sdk?
2018-06-08 08:46:14 UTC - Idan: what if I want to increase producing throughput
2018-06-08 08:46:56 UTC - Idan: batching wont help me as I must have very low 
latency so I must produce the messages as soon as they arrive
2018-06-08 08:49:54 UTC - Idan: another question would be if iam not using 
batching so blockIfQueueFull param is not relelvant.. as I assume that 
blockIfQueueFull only related to batching queue’s
2018-06-08 08:53:38 UTC - Sijie Guo: @Idan I think there is a setting for the 
IO threads to configure. But I don’t think you really need to configure them, 
since pulsar client is using netty, which is pretty asynchronous and non 
blocking. So you don’t need to configure the threads for throughput.
2018-06-08 08:54:27 UTC - Idan: sorry for those questions.. just we trying to 
think about scaling from day 0
2018-06-08 08:55:47 UTC - Sijie Guo: BlockIfQueueFull is for both cases. It is 
more meaning block if there are max messages outstanding
2018-06-08 08:56:05 UTC - Sijie Guo: You don’t need to say sorry :)
2018-06-08 08:56:38 UTC - Idan: but if no batching queue is defined why would I 
set max messages(BlockIfQueueFull)? i mean the producer is sending them right 
away.. where would it queued-up? at Netty’s async thread?
2018-06-08 08:59:26 UTC - Sijie Guo: Yes it is sending right away, but the 
client is waiting for acknowledgments from broker, so the messages are put in a 
*queue*, as waiting for acknowledgements. If publish failed, the messages might 
be retried. So it is the queue for messages pending for acknowledgments
2018-06-08 08:59:41 UTC - Sijie Guo: Does that make sense to you?
2018-06-08 09:00:48 UTC - Idan: yes now it does. that queue is also 
defined-able ?
2018-06-08 09:01:30 UTC - Idan: My purpose is to send messages async right 
away. so I created my producer with .enableBatching mode with queue size of 1
2018-06-08 09:01:38 UTC - Idan: this will allow me to release the producer 
thread as soon as possible
2018-06-08 09:01:42 UTC - Idan: hope I did OK with that
2018-06-08 09:02:07 UTC - Idan: producer = client.newProducer(new 
                    .sendTimeout(producerSentTimeout, TimeUnit.SECONDS)
2018-06-08 09:02:13 UTC - Idan: something like that
2018-06-08 09:02:36 UTC - Idan: reason                    
.batchingMaxMessages(1) is because I must keep latency very low
2018-06-08 09:05:04 UTC - Sijie Guo: Oh, you can just disable batching then. 
2018-06-08 09:06:49 UTC - Sijie Guo: maxPendingMessages
2018-06-08 09:07:28 UTC - Sijie Guo: That is the setting for configuring the 
queue size, 
2018-06-08 09:07:47 UTC - Idan: but if I disable batching i wont be able to 
async my call.. i must not block my producer thread
2018-06-08 09:07:55 UTC - Idan: as iam working on non-blocking eventloop
2018-06-08 09:08:33 UTC - Idan: ah i got ya.. so if I set maxPendingMessages 
into 1 that will do the trick with the queue?
2018-06-08 09:08:39 UTC - Sijie Guo: When you disable batching, you can still 
produce messages asynchronously 
2018-06-08 09:10:48 UTC - Sijie Guo: Batching doesn’t mean not blocking. It 
only matters how pulsar transfer the data to brokers. Pulsar provides both sync 
and asynchronous produce methods for both batching and non batching 

Reply via email to