Re: Kafka Streams : TimeoutException Expiring Records

2018-02-22 Thread Tony John
Thanks a lot Bill for looking in to this. I would definitely attempt the
suggestions and let you know the outcome. I have gone through KIP-91, but
struggling to understand the behavior. So does it mean that these errors
are happening due to a failure in the broker? If so why would it kill all
the threads which causes the consumers to be unavailable? Also is there
anyway to handle these errors gracefully so that it does not kill the
threads and hence the consumers?

Thanks,
Tony

On Fri, Feb 23, 2018 at 1:15 AM, Bill Bejeck  wrote:

> Hi Tony,
>
> Looks like you have a known issue that KIP-91(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 91+Provide+Intuitive+User+Timeouts+in+The+Producer)
> will address.
>
> In the meantime, as a workaround, you could try
> setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?)
> other secondary configurations to consider changing would be increasing "
> max.block.ms" and "retries"
>
> Thanks,
> Bill
>
> On Thu, Feb 22, 2018 at 8:14 AM, Tony John 
> wrote:
>
> > Hi All,
> >
> > I am running into an issue with my Kafka Streams application. The
> > application was running fine for almost 2 weeks, then it started throwing
> > the below exception which caused the threads to die. Now when I restart
> the
> > application, it dies quickly (1-2 hrs) when trying to catch up the lag.
> >
> > The application is running on an AWS EC2 instance with 8 core processor
> and
> > 16GB of memory. The streams config is given below and more logs are
> > available below (I have stripped of some logs which I though may not be
> > relevant). Towards the end of this thread you will be able to see lot of
> > exceptions similar to the below one + RocksDBExceptions
> > (org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> > store vstore at location
> > /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please
> take
> > a look at it and let me know what could be wrong here?
> >
> > INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
> > standby tasks [4_0, 1_30] in 0ms
> > ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
> > Engine2-StreamThread-6-producer]
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
> > [1_34] Error sending record to topic cv-v1-cv. No more offsets will be
> > recorded for this task and the exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s)
> for
> > cv-v1-cv-2: 31439 ms has passed since last append
> > DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-3] processing latency 46282 > commit time 3 for
> > 1 records. Adjusting down recordsProcessedBeforeCommit=6482
> > ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
> > following error:
> > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at 

ProcessorTopology

2018-02-22 Thread pravin kumar
Can we give the ouput of one processorTopology as the input to another
processorTopology.

if it is possible,how can we do it.
Can anyone provide it with any example


Re: Doubts about multiple instance in kafka

2018-02-22 Thread naresh Goud
Hi Pravin,

Your correct.
you can run application with multiple times so they will be started on
multiples JVM's (   run1 :- java yourclass (which runs in one JVM)
;  run2: java yourclass(which runs in another JVM ) )

or else

you can run application on multiple machines i.e multiple application
instances run on multiple JVM's  (run1 :- java yourclass (which runs in one
JVM on machine1) run2: java yourclass(which runs in another JVM  in
another machine2) )



Thank you,
Naresh



On Thu, Feb 22, 2018 at 12:15 AM, pravin kumar  wrote:

> I have the Kafka confluent Document.
>
> But i cant understand the following line.
>
> "It is important to understand that Kafka Streams is not a resource
> manager, but a library that “runs” anywhere its stream processing
> application runs. Multiple instances of the application are executed either
> on the same machine, or spread across multiple machines and tasks can be 
> distributed
> automatically by the library
> 
> to those running application instances"
>
> i have tried to run on same machine with multiple JVM with multiple
> consumers.
>
> is it correct way to run on same machine using multiple consumers??
> or is there any other way??
> i have attached the code below
>


Review fo patch JIRA 1194 (pull req 3838)

2018-02-22 Thread M. Manna
Hello,

Could someone please review the patch provided? We are going live in prod
with 9 brokers on windows VM, and this is a blocker.

To summarise, the patch allows unmapping when log index files are looked.
Cleanup fails as a result, and it is specific that Windows OS.

Regards,


Re: Kafka Streams : TimeoutException Expiring Records

2018-02-22 Thread Bill Bejeck
Hi Tony,

Looks like you have a known issue that KIP-91(
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer)
will address.

In the meantime, as a workaround, you could try
setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?)
other secondary configurations to consider changing would be increasing "
max.block.ms" and "retries"

Thanks,
Bill

On Thu, Feb 22, 2018 at 8:14 AM, Tony John  wrote:

> Hi All,
>
> I am running into an issue with my Kafka Streams application. The
> application was running fine for almost 2 weeks, then it started throwing
> the below exception which caused the threads to die. Now when I restart the
> application, it dies quickly (1-2 hrs) when trying to catch up the lag.
>
> The application is running on an AWS EC2 instance with 8 core processor and
> 16GB of memory. The streams config is given below and more logs are
> available below (I have stripped of some logs which I though may not be
> relevant). Towards the end of this thread you will be able to see lot of
> exceptions similar to the below one + RocksDBExceptions
> (org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> store vstore at location
> /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take
> a look at it and let me know what could be wrong here?
>
> INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
> standby tasks [4_0, 1_30] in 0ms
> ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
> Engine2-StreamThread-6-producer]
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
> [1_34] Error sending record to topic cv-v1-cv. No more offsets will be
> recorded for this task and the exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for
> cv-v1-cv-2: 31439 ms has passed since last append
> DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-3] processing latency 46282 > commit time 3 for
> 1 records. Adjusting down recordsProcessedBeforeCommit=6482
> ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
> following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:145)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:296)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 31439 ms has passed since last append
> INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] Shutting down
> INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> 

Producer blocks on send to topic that doesn't exist if auto create is disabled

2018-02-22 Thread Josef Ludvíček
Hello, 

Is anyone aware of plans for resolving [KAFKA-3450] - Producer blocks on send 
to topic that doesn't exist if auto create is disabled ?
Or any good workaround / hints to fix?

Thanks for any advice.


Josef

https://issues.apache.org/jira/browse/KAFKA-3450 


Re: Consumer group describe issue

2018-02-22 Thread Bill Bejeck
Can someone add Sahil to the contributor list?

Thanks,
Bill

On Thu, Feb 22, 2018 at 3:39 AM, sahil aggarwal 
wrote:

> Bill,
>
> I have created the jira: https://issues.apache.org/jira/browse/KAFKA-6581
> but i am not able to assign it to myself.
>
> Can you please help?
>
>
> Thanks,
> Sahil
>
> On 28 December 2017 at 11:50, sahil aggarwal 
> wrote:
>
> > @TedYu
> > From 0.10.0 
> >
> > @Bill
> > Thanks for the pointer. Will follow the steps mentioned in the doc.
> >
> > On 28 December 2017 at 07:39, Ted Yu  wrote:
> >
> >> Which branch was the patch generated from ?
> >> When I tried to apply the patch:
> >>
> >> 6 out of 7 hunks FAILED -- saving rejects to file
> >> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
> >>
> >> FYI
> >>
> >> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
> >> wrote:
> >>
> >> > Attached the patch. If someone can review it will be very helpfull.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> > sahil
> >> >
> >> > On 23 December 2017 at 13:11, sahil aggarwal 
> >> > wrote:
> >> >
> >> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
> >> >>
> >> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
> >> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
> >> >> rGroupCommand.scala#L467
> >> >>
> >> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
> >> >>
> >> >>> Sahil:
> >> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets
> but
> >> >>> didn't find any occurrence.
> >> >>>
> >> >>> Mind giving us the location (and class) where getEndOffsets is
> called
> >> ?
> >> >>>
> >> >>> Thanks
> >> >>>
> >> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
> >> sahil.ag...@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Fixed it by some code change in ConsumerGroupCommand.scala.
> >> Possible to
> >> >>> > push it upstream for 0.10.* ?
> >> >>> >
> >> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
> >> has
> >> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
> >> >>> >
> >> >>> > On 23 December 2017 at 02:46, Matthias J. Sax <
> >> matth...@confluent.io>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Your observation is correct. KafkaConsumer.position() is a
> >> blocking
> >> >>> > > call. It's a know issue that there is no configurable timeout
> >> value.
> >> >>> > >
> >> >>> > > I am not aware of any workaround.
> >> >>> > >
> >> >>> > >
> >> >>> > > -Matthias
> >> >>> > >
> >> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> >> >>> > > > Hi,
> >> >>> > > >
> >> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
> >> >>> stuck if
> >> >>> > > one
> >> >>> > > > of the partition is unavailable i.e no leader. Going through
> >> some
> >> >>> code
> >> >>> > > > found that it does following to get log end offset:
> >> >>> > > >
> >> >>> > > > * Create consumer
> >> >>> > > > * For each partition
> >> >>> > > >* assign partition
> >> >>> > > >* seek to end
> >> >>> > > >* get position
> >> >>> > > >
> >> >>> > > > Issue is KafkaConsumer.posiiton() use
> >> >>> Fetcher.retrieveOffsetsByTimes()
> >> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
> >> gets
> >> >>> > stuck
> >> >>> > > in
> >> >>> > > > loop there.
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > Any pointers?
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > *Version*: 0.10.0.1
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > Thanks,
> >> >>> > > > Sahil
> >> >>> > > >
> >> >>> > >
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >
> >
>


Kafka Streams : TimeoutException Expiring Records

2018-02-22 Thread Tony John
Hi All,

I am running into an issue with my Kafka Streams application. The
application was running fine for almost 2 weeks, then it started throwing
the below exception which caused the threads to die. Now when I restart the
application, it dies quickly (1-2 hrs) when trying to catch up the lag.

The application is running on an AWS EC2 instance with 8 core processor and
16GB of memory. The streams config is given below and more logs are
available below (I have stripped of some logs which I though may not be
relevant). Towards the end of this thread you will be able to see lot of
exceptions similar to the below one + RocksDBExceptions
(org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store vstore at location
/mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take
a look at it and let me know what could be wrong here?

INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
standby tasks [4_0, 1_30] in 0ms
ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
Engine2-StreamThread-6-producer]
org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
[1_34] Error sending record to topic cv-v1-cv. No more offsets will be
recorded for this task and the exception will eventually be thrown
org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for
cv-v1-cv-2: 31439 ms has passed since last append
DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-3] processing latency 46282 > commit time 3 for
1 records. Adjusting down recordsProcessedBeforeCommit=6482
ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
[Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
following error:
org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
checkForException(RecordCollectorImpl.java:137)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275)
~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:?
]
at org.apache.kafka.streams.processor.internals.
StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.
StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374)
~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.
applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.
punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.
processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482)
[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
record(s) for cv-v1-cv-2: 31439 ms has passed since last append
INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] Shutting down
INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to
PENDING_SHUTDOWN.
DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15],
standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and
suspended standby tasks [1_3, 1_30]
INFO  2018-02-21 08:37:24.879 [Engine2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and
standby tasks [1_0, 1_6, 

Re: Consumer group describe issue

2018-02-22 Thread sahil aggarwal
Bill,

I have created the jira: https://issues.apache.org/jira/browse/KAFKA-6581
but i am not able to assign it to myself.

Can you please help?


Thanks,
Sahil

On 28 December 2017 at 11:50, sahil aggarwal  wrote:

> @TedYu
> From 0.10.0 
>
> @Bill
> Thanks for the pointer. Will follow the steps mentioned in the doc.
>
> On 28 December 2017 at 07:39, Ted Yu  wrote:
>
>> Which branch was the patch generated from ?
>> When I tried to apply the patch:
>>
>> 6 out of 7 hunks FAILED -- saving rejects to file
>> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
>>
>> FYI
>>
>> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
>> wrote:
>>
>> > Attached the patch. If someone can review it will be very helpfull.
>> >
>> >
>> >
>> > Thanks,
>> > sahil
>> >
>> > On 23 December 2017 at 13:11, sahil aggarwal 
>> > wrote:
>> >
>> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>> >>
>> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> >> rGroupCommand.scala#L467
>> >>
>> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
>> >>
>> >>> Sahil:
>> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>> >>> didn't find any occurrence.
>> >>>
>> >>> Mind giving us the location (and class) where getEndOffsets is called
>> ?
>> >>>
>> >>> Thanks
>> >>>
>> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
>> sahil.ag...@gmail.com>
>> >>> wrote:
>> >>>
>> >>> > Fixed it by some code change in ConsumerGroupCommand.scala.
>> Possible to
>> >>> > push it upstream for 0.10.* ?
>> >>> >
>> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
>> has
>> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
>> >>> >
>> >>> > On 23 December 2017 at 02:46, Matthias J. Sax <
>> matth...@confluent.io>
>> >>> > wrote:
>> >>> >
>> >>> > > Your observation is correct. KafkaConsumer.position() is a
>> blocking
>> >>> > > call. It's a know issue that there is no configurable timeout
>> value.
>> >>> > >
>> >>> > > I am not aware of any workaround.
>> >>> > >
>> >>> > >
>> >>> > > -Matthias
>> >>> > >
>> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>> >>> > > > Hi,
>> >>> > > >
>> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>> >>> stuck if
>> >>> > > one
>> >>> > > > of the partition is unavailable i.e no leader. Going through
>> some
>> >>> code
>> >>> > > > found that it does following to get log end offset:
>> >>> > > >
>> >>> > > > * Create consumer
>> >>> > > > * For each partition
>> >>> > > >* assign partition
>> >>> > > >* seek to end
>> >>> > > >* get position
>> >>> > > >
>> >>> > > > Issue is KafkaConsumer.posiiton() use
>> >>> Fetcher.retrieveOffsetsByTimes()
>> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
>> gets
>> >>> > stuck
>> >>> > > in
>> >>> > > > loop there.
>> >>> > > >
>> >>> > > >
>> >>> > > > Any pointers?
>> >>> > > >
>> >>> > > >
>> >>> > > > *Version*: 0.10.0.1
>> >>> > > >
>> >>> > > >
>> >>> > > > Thanks,
>> >>> > > > Sahil
>> >>> > > >
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>
>


Re: usage of depricated method in kafka 2_12.1.0.0

2018-02-22 Thread Matthias J. Sax
As the JavaDocs point out
(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#count-java.lang.String-),
the new method is

#count(Materialized)

for example


stream.groupByKey.count(Materialized.as("store-name"));


-Matthias

On 2/21/18 11:08 PM, pravin kumar wrote:
> i have tried wikifeed example with Kafka 2_12.1.0.0.the count method is
> now depricated ,
> 
> previously in kafka_2.11-0.10.2.1 i have given count(localStateStoreName).
> 
> how to give the statestore name in Kafka 2_12.1.0.0.
> 
> i have attached the code below,
> 



signature.asc
Description: OpenPGP digital signature