Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum <m...@setfive.com>
Date: Monday, March 5, 2018 at 5:54 AM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "users@kafka.apache.org" <users@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer 
batching?
- Any other things you'd suggest looking out for as gotcha's or configurations 
that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum <m...@setfive.com<mailto:m...@setfive.com>>

Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
<users@kafka.apache.org<mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum <m...@setfive.com<mailto:m...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
<users@kafka.apache.org<mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across 
all the data right?   Also having millions of different values per grouped 
attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <m...@setfive.com<mailto:m...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 
of our application servers.  We currently sample some impressions and then 
dedupe/count outside at a different DC, but are looking to try to analyze all 
impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to 
network jitter etc. it would be fine we're trying to just get overall a rough 
count of each attribute.  Creating batched messages definitely makes sense and 
will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking 
to do as these are physical servers so we'll most likely need to buy new 
hardware.  For the first run I think we'll try it out on one of our application 
clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka 
cluster on the same machines as the applications.

So would the best route here be something like each application server batches 
requests, send it to kafka, have a stream consumer that then tallies up the 
totals per attribute that we want to track, output that to a new topic, which 
then goes to a sink to either a DB or something like S3 which then we read into 
our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical 
avro file (byte buffer) of say N requests and then making that into one Kafka 
msg payload.

We have a similar situation 
(https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found 
anything from 4x to 10x better throughput with batching as compared to one 
request per msg.
We have different kinds of msgs/topics and the individual "request" size varies 
from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <m...@setfive.com<mailto:m...@setfive.com>> 
wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    
https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum



Reply via email to