Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-26 Thread Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).

Does this make sense?

Having said this: you _can_ write a KStream into a topic an read it back
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.


-Matthias

On 10/26/18 9:30 AM, John Roesler wrote:
> Hi Patrik,
> 
> Just to drop one observation in... Streaming to a topic and then consuming
> it as a table does create overhead, but so does reducing a stream to a
> table, and I think it's actually the same in either case.
> 
> They both require a store to collect the table state, and in both cases,
> the stores need to have a changelog topic. For the "reduce" version, it's
> an internal changelog topic, and for the "topic-to-table" version, the
> store can use the intermediate topic as its changelog.
> 
> This doesn't address your ergonomic concern, but it seemed worth pointing
> out that (as far as I can tell), there doesn't seem to be a difference in
> overhead.
> 
> Hope this helps!
> -John
> 
> On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl  wrote:
> 
>> Hello Matthias,
>> thank you for the explanation.
>> Streaming back to a topic and consuming this as a KTable does respect the
>> null values as deletes, correct? But at the price of some overhead.
>> Is there any (historical, technical or emotional;-)) reason that no simple
>> one-step stream-to-table operation exists?
>> Best regards
>> Patrik
>>
>>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax :
>>>
>>> Patrik,
>>>
>>> `null` values in a KStream don't have delete semantics (it's not a
>>> changelog stream). That's why we drop them in the KStream#reduce
>>> implemenation.
>>>
>>> If you want to explicitly remove results for a key from the result
>>> KTable, your `Reducer#apply()` implementation must return `null` -- the
>>> result of #apply() has changelog/KTable semantics and `null` is
>>> interpreted as delete for this case.
>>>
>>> If you want to use `null` from your KStream to trigger reduce() to
>>> delete, you will need to use a surrogate value for this, ie, do a
>>> mapValues() before the groupByKey() call, an replace `null` values with
>>> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
>>> to return `null` for this case.
>>>
>>> Hope this helps.
>>>
>>> -Matthias
>>>
 On 10/25/18 10:36 AM, Patrik Kleindl wrote:
 Hello

 Recently we noticed a lot of warning messages in the logs which pointed
>> to
 this method (we are running 2.0):

 KStreamReduce
 public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
 value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
 context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}

 This was triggered for every record from a stream with an existing key
>> but
 a null value which we put through groupBy/reduce to get a KTable.
 My assumption was that this was the correct way inside a streams
 application to get a KTable but this prevents deletion of records from
 working.

 Our alternativ is to send the stream back to a named topic and build a
>> new
 table from it, but this is rather cumbersome and requires a separate
>> topic
 which also can't be cleaned up by the streams reset tool.

 Did I miss anything relevant here?
 Would it be possible to create a separate method for KStream to achieve
 this directly?

 best regards

 Patrik

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Consumer Pause & Scheduled Resume

2018-10-26 Thread Manoj Khangaonkar
Hi Pradeep

The poll , pause and resume need to happen in the same thread -- in the
same while loop.

If a scheduler is the trigger for pause or resume, do not call pause
/resume from the scheduler thread. Instead set a
variable in the class that has the poll loop. The poll loop can check the
variable and pause/resume as necessary.

For the rebalance scenario , you should implement the
ConsumerRebalanceListener interface and register it with the consumer.
It will get called when paritions are assigned or revoked. There you can
call pause or resume again

Hope this helps

regards


On Thu, Oct 25, 2018 at 6:11 PM pradeep s 
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the messages
> and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar 
> wrote:
>
> > One item to be aware with pause and resume - is that it applies to
> > partitions currently assigned to the consumer.
> >
> > But partitions can get revoked or additional partitions can get assigned
> to
> > consumer.
> >
> > With reassigned , you might be expecting the consumer to be paused but
> > suddenly start getting messages because a new partition got assigned.
> >
> > Use the RebalanceListener to pause or resume any new partitions
> >
> > regards
> >
> > On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
> > wrote:
> >
> > > That is correct: clients are not thread safe.
> > >
> > > You can use an `AtomicBoolean needToResume` that you share over both
> > > threads and that is initially false.
> > >
> > > In your scheduled method, you set the variable to true.
> > >
> > > In your main consumer, each time before you call poll(), you check if
> > > the variable is set to true. If yes, you resume() and reset the
> variable
> > > to false.
> > >
> > > Hope this helps.
> > >
> > > -Matthias
> > >
> > >
> > > On 10/25/18 2:09 PM, pradeep s wrote:
> > > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > > resume
> > > > from the scheduled method .
> > > > Was getting exception that  Kafka Consumer is not safe for multi
> > threaded
> > > > access . I am trying to see how can call pause and resume on the same
> > > > thread. There will be only one thread running for consumption.
> > > >
> > > >
> > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> There is no issue if you call `poll()` is all partitions are paused.
> > If
> > > >> fact, if you want to make sure that the consumer does not fall out
> of
> > > >> the consumer group, you must call `poll()` in regular interval to
> not
> > > >> hit `max.poll.interval.ms` timeout.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > > >>> Pause and resume is required since i am running a pod in kubernetes
> > > and i
> > > >>> am not shutting down the app
> > > >>>
> > > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > > sreekumar.prad...@gmail.com>
> > > >>> wrote:
> > > >>>
> > >  Hi,
> > >  I have a requirement to have kafka streaming start at scheduled
> time
> > > and
> > >  then pause the stream when the consumer poll returns empty fetches
> > for
> > > >> 3 or
> > >  more polls.
> > > 
> > >  I am starting a consumer poll loop during application startup
> using
> > a
> > >  singled thread executor and then pausing the consumer when the
> poll
> > is
> > >  returning empty for 3 polls.
> > > 
> > >  When the schedule kicks in , i am calling *consumer.resume.*
> > > 
> > >  Is this approach correct ?
> > >  Will it cause any issue If the  consumer calls poll on a paused
> > > >> consumer.
> > > 
> > >  Skeleton Code
> > >  
> > > 
> > >  public class *OfferItemImageConsumer* implements Runnable {
> > > 
> > >  @Override
> > >  public void run() {
> > >  try {
> > >  do {
> > >  ConsumerRecords records =
> > > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> > >  

Add company name on the Powered by page

2018-10-26 Thread Rohan Rasane
Hi,
What is the email address, if I want to add ServiceNow to the powered by
page?

-Rohan


Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-26 Thread John Roesler
Hi Patrik,

Just to drop one observation in... Streaming to a topic and then consuming
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.

They both require a store to collect the table state, and in both cases,
the stores need to have a changelog topic. For the "reduce" version, it's
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.

This doesn't address your ergonomic concern, but it seemed worth pointing
out that (as far as I can tell), there doesn't seem to be a difference in
overhead.

Hope this helps!
-John

On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl  wrote:

> Hello Matthias,
> thank you for the explanation.
> Streaming back to a topic and consuming this as a KTable does respect the
> null values as deletes, correct? But at the price of some overhead.
> Is there any (historical, technical or emotional;-)) reason that no simple
> one-step stream-to-table operation exists?
> Best regards
> Patrik
>
> > Am 26.10.2018 um 00:07 schrieb Matthias J. Sax :
> >
> > Patrik,
> >
> > `null` values in a KStream don't have delete semantics (it's not a
> > changelog stream). That's why we drop them in the KStream#reduce
> > implemenation.
> >
> > If you want to explicitly remove results for a key from the result
> > KTable, your `Reducer#apply()` implementation must return `null` -- the
> > result of #apply() has changelog/KTable semantics and `null` is
> > interpreted as delete for this case.
> >
> > If you want to use `null` from your KStream to trigger reduce() to
> > delete, you will need to use a surrogate value for this, ie, do a
> > mapValues() before the groupByKey() call, an replace `null` values with
> > the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> > to return `null` for this case.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> >> Hello
> >>
> >> Recently we noticed a lot of warning messages in the logs which pointed
> to
> >> this method (we are running 2.0):
> >>
> >> KStreamReduce
> >> public void process(final K key, final V value) {
> >>// If the key or value is null we don't need to proceed
> >>if (key == null || value == null) {
> >>LOG.warn(
> >>"Skipping record due to null key or value. key=[{}]
> >> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
> >>key, value, context().topic(), context().partition(),
> >> context().offset()
> >>);
> >>metrics.skippedRecordsSensor().record();
> >>return;
> >>}
> >>
> >> This was triggered for every record from a stream with an existing key
> but
> >> a null value which we put through groupBy/reduce to get a KTable.
> >> My assumption was that this was the correct way inside a streams
> >> application to get a KTable but this prevents deletion of records from
> >> working.
> >>
> >> Our alternativ is to send the stream back to a named topic and build a
> new
> >> table from it, but this is rather cumbersome and requires a separate
> topic
> >> which also can't be cleaned up by the streams reset tool.
> >>
> >> Did I miss anything relevant here?
> >> Would it be possible to create a separate method for KStream to achieve
> >> this directly?
> >>
> >> best regards
> >>
> >> Patrik
> >>
> >
>


Re: How to turn-off Time-Based index files in Kafka?

2018-10-26 Thread Manikumar
We can't disable time-based indexing. What is the issue you are facing?

On Thu, Oct 25, 2018 at 6:04 PM Ashwin Sinha 
wrote:

> Hi Users,
>
> How to turn off time-based index files in Kafka (.timeindex files). Tried
> searching for it in topic and broker configs documentation but could not
> find it.
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.si...@go-mmt.com  | 9452075361
>  
> 
>
> --
>
>
> ::DISCLAIMER::
>
>
>
> 
>
>
>
>
>
> This message is intended only for the use of the addressee and may
> contain information that is privileged, confidential and exempt from
> disclosure under applicable law. If the reader of this message is not the
> intended recipient, or the employee or agent responsible for delivering
> the
> message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. If you have received this e-mail in error, please notify us
> immediately by return e-mail and delete this e-mail and all attachments
> from your system.
>


Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-26 Thread Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the null 
values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple 
one-step stream-to-table operation exists?
Best regards
Patrik

> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax :
> 
> Patrik,
> 
> `null` values in a KStream don't have delete semantics (it's not a
> changelog stream). That's why we drop them in the KStream#reduce
> implemenation.
> 
> If you want to explicitly remove results for a key from the result
> KTable, your `Reducer#apply()` implementation must return `null` -- the
> result of #apply() has changelog/KTable semantics and `null` is
> interpreted as delete for this case.
> 
> If you want to use `null` from your KStream to trigger reduce() to
> delete, you will need to use a surrogate value for this, ie, do a
> mapValues() before the groupByKey() call, an replace `null` values with
> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> to return `null` for this case.
> 
> Hope this helps.
> 
> -Matthias
> 
>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
>> Hello
>> 
>> Recently we noticed a lot of warning messages in the logs which pointed to
>> this method (we are running 2.0):
>> 
>> KStreamReduce
>> public void process(final K key, final V value) {
>>// If the key or value is null we don't need to proceed
>>if (key == null || value == null) {
>>LOG.warn(
>>"Skipping record due to null key or value. key=[{}]
>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
>>key, value, context().topic(), context().partition(),
>> context().offset()
>>);
>>metrics.skippedRecordsSensor().record();
>>return;
>>}
>> 
>> This was triggered for every record from a stream with an existing key but
>> a null value which we put through groupBy/reduce to get a KTable.
>> My assumption was that this was the correct way inside a streams
>> application to get a KTable but this prevents deletion of records from
>> working.
>> 
>> Our alternativ is to send the stream back to a named topic and build a new
>> table from it, but this is rather cumbersome and requires a separate topic
>> which also can't be cleaned up by the streams reset tool.
>> 
>> Did I miss anything relevant here?
>> Would it be possible to create a separate method for KStream to achieve
>> this directly?
>> 
>> best regards
>> 
>> Patrik
>> 
> 


Kafka Consumer Join-Rate Broker Metrics

2018-10-26 Thread Sayat Satybaldiyev
Dear all,

Kafka consumer has exposed join rate metric from
consumer-coordinator-metrics.  I'm wondering if there a possibility to
expose it on the broker side? As descriptions say join group rebalance is a
signal of unhealthy consumption and it's hard to get this from the client
side.
join-rateThe number of group joins per second. Group joining is the first
phase of the rebalance protocol. A large value indicates that the consumer
group is unstable and will likely be coupled with increased lag
https://docs.confluent.io/current/kafka/monitoring.html