Re: Kafka Setup for Daily counts on wide array of keys

2018-03-03 Thread Thakrar, Jayesh
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical 
avro file (byte buffer) of say N requests and then making that into one Kafka 
msg payload.

We have a similar situation 
(https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found 
anything from 4x to 10x better throughput with batching as compared to one 
request per msg.
We have different kinds of msgs/topics and the individual "request" size varies 
from  about 100 bytes to 1+ KB. 

On 3/2/18, 8:24 AM, "Matt Daum"  wrote:

I am new to Kafka but I think I have a good use case for it.  I am trying
to build daily counts of requests based on a number of different attributes
in a high throughput system (~1 million requests/sec. across all  8
servers).  The different attributes are unbounded in terms of values, and
some will spread across 100's of millions values.  This is my current
through process, let me know where I could be more efficient or if there is
a better way to do it.

I'll create an AVRO object "Impression" which has all the attributes of the
inbound request.  My application servers then will on each request create
and send this to a single kafka topic.

I'll then have a consumer which creates a stream from the topic.  From
there I'll use the windowed timeframes and groupBy to group by the
attributes on each given day.  At the end of the day I'd need to read out
the data store to an external system for storage.  Since I won't know all
the values I'd need something similar to the KVStore.all() but for
WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
commit:

https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
.

Is this the best approach to doing this?  Or would I be better using the
stream to listen and then an external DB like Aerospike to store the counts
and read out of it directly end of day.

Thanks for the help!
Daum




committing offset metadata in kafka streams

2018-03-03 Thread Stas Chizhov
Hi,

There seems to be no way to commit custom metadata along with offsets from
within Kafka Streams.
Are there any plans to expose this functionality or have I missed something?

Best regards,
Stanislav.


Re: difference between 2 options

2018-03-03 Thread Andras Beni
Hello Adrien,

I was wrong. There is only one such file per data dir and not one per
topicpartition dir. It is a text file containing
 - a format version number (0),
 - number of following entries, and
 - one entry for each topicpartition: topic name, partition and offset.

Yes, when the broker starts, it checks these entries. As you probably know,
one topicpartition is written to multiple log segments. If the broker finds
that there are messages after the recovery point, each log segment that
contains such messages will be iterated over and the messages will be
checked and a new index will be built.

I hope this answers your questions.

Best regards,
Andras



On Thu, Mar 1, 2018 at 2:59 AM, adrien ruffie 
wrote:

> Sorry Andras, the the delay of my response.
>
>
> Ok I correctly understood for the deletion thank to your explanation.
>
>
> however, for recovery point I wanted to ask you, the concept's logic:
>
>
> For example I have one recovery-point-offset-checkpoint in topic-0
>
>
> If the broker crashed, and restarted:
>
>
> the fact that a recovery-point-offset-checkpoint is present, this avoid
> recovering the whole log during startup.
>
> But what does that mean exactly ? Only one offset number is present in
> this recovering file ?
>
> If is the case: le broker will simply load in memory all messages in this
> log from this offset?
>
>
> I really want to correctly understand the concept 
>
>
> Best regards,
>
>
> Adrien
>
> 
> De : Andras Beni 
> Envoyé : mardi 27 février 2018 15:41:04
> À : users@kafka.apache.org
> Objet : Re: difference between 2 options
>
> 1) We write out one recovery point per log directory, which practically
> means topicpartition. So if your topic is called mytopic, then you will
> have a file called
>
> recovery-point-offset-checkpoint in topic-0/ , in topic-1/ , and in
> topic-2/ .
>
> 2) Data deletion in kafka is not related to what was read by consumers.
> Data is deleted when there is either to much of it (log.retention.bytes
> property) or it is too old (log.retention.ms property). And consumers keep
> track of what they have consumed using the __consumer_offsets topic (or
> some custom logic they choose).
> What we are talking about is DeleteRecordsRequest. It is sent by a command
> line tool called kafka.admin.DeleteRecordsCommand. This does not actually
> delete any data but notes that the data before a given offset should not be
> served anymore. This, just like recovery checkpointing, works on a
> per-partition basis.
>
> Does this answer your questions?
>
> Best regards,
> Andras
>
>
> On Mon, Feb 26, 2018 at 11:43 PM, adrien ruffie  >
> wrote:
>
> > Hi Andras,
> >
> >
> > thank for your response !
> >
> > For log.flush.offset.checkpoint.interval.ms we write out only one
> > recovery point for all logs ?
> >
> > But if I have 3 partitions, and for each partition the offset is
> > different, what's happen ? We save in
> >
> > text file 3 different offset ? Or just only one for the three partitions
> ?
> >
> >
> > When you say "to avoid exposing data that have been deleted by
> > DeleteRecordsRequest"
> >
> > It means the old consumed data ? For example I have 34700 offset, it's to
> > avoid reexposing
> >
> > 34000~34699 records to consumer after crash ?
> >
> > 
> > De : Andras Beni 
> > Envoyé : mardi 27 février 2018 06:16:41
> > À : users@kafka.apache.org
> > Objet : Re: difference between 2 options
> >
> > Hi Adrien,
> >
> > Every log.flush.offset.checkpoint.interval.ms  we write out the current
> > recovery point for all logs to a text file in the log directory to avoid
> > recovering the whole log on startup.
> >
> > and every log.flush.start.offset.checkpoint.interval.ms we write out the
> > current log start offset for all logs to a text file in the log directory
> > to avoid exposing data that have been deleted by DeleteRecordsRequest
> >
> > HTH,
> > Andras
> >
> >
> > On Mon, Feb 26, 2018 at 1:51 PM, adrien ruffie <
> adriennolar...@hotmail.fr>
> > wrote:
> >
> > > Hello all,
> > >
> > >
> > > I have read linked porperties documentation, but I don't really
> > understand
> > > the difference between:
> > >
> > > log.flush.offset.checkpoint.interval.ms
> > >
> > >
> > > and
> > >
> > >
> > > log.flush.start.offset.checkpoint.interval.ms
> > >
> > >
> > > Do you have a usecase of each property's utilization, I can't figure
> out
> > > what the difference ...
> > >
> > >
> > > best regards,
> > >
> > >
> > > Adrien
> > >
> >
>