Re: Embedding zookeeper and kafka in java process.

2016-06-17 Thread Achanta Vamsi Subhash
If you are using it for tests, this works with Kafka 10 (tune broker
configs as per your req)

public class TestKafkaCluster {
final KafkaServer kafkaServer;
final ZkClient zkClient;
private String zkConnectionString;

public TestKafkaCluster(String zkConnectionString, int kafkaPort)
throws Exception {
this.zkConnectionString = zkConnectionString;
zkClient = new ZkClient(zkConnectionString,
3, 3, ZKStringSerializer$.MODULE$);
//zkClient.createPersistent("/flo/kafka", true);
final KafkaConfig config = getKafkaConfig(zkConnectionString,
kafkaPort);
config.port();
final Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
}

private static KafkaConfig getKafkaConfig(final String
zkConnectString, int port) {
final Properties brokerConfig = TestUtils.createBrokerConfig(1,
zkConnectString,
false,
false,
port,
Option.apply(SecurityProtocol.PLAINTEXT),
Option.empty(),
Option.empty(),
false,
false,
0,
false,
0,
false,
0,
Option.empty());
brokerConfig.put("default.replication.factor", String.valueOf(1));
return new KafkaConfig(brokerConfig);
}

public KafkaServer getKafkaServer() {
return kafkaServer;
}

public void stop() throws IOException {
kafkaServer.shutdown();
zkClient.close();
}

public String getZkConnectionString() {
return zkConnectionString;
}
}


On Fri, Jun 17, 2016 at 2:18 AM, Ismael Juma  wrote:

> Try using kafka.server.KafkaServerStartable instead. It should do the right
> thing.
>
> Ismael
>
> On Thu, Jun 16, 2016 at 7:18 PM, Subhash Agrawal 
> wrote:
>
> > Thanks Ismael.
> > I am instantiating kafkaserver instance like this.
> > new KafkaServer(kafkaConfig,null,null);
> >
> > I tried to use
> > new KafkaServer(kafkaConfig); but it does not compile with kafka 0.10.0.
> >
> > All the example I see uses
> > new KafkaServer(kafkaConfig);
> >
> > Do we support new KafkaServer(kafkaConfig);   with kafka 0.10.0? if not,
> > how can I pass
> > these parameters? It used to work with kafka 0.7.1.
> >
> > Thanks
> > Subhash Agrawal
> >
> >
> > -Original Message-
> > From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> > Juma
> > Sent: Thursday, June 16, 2016 1:38 AM
> > To: dev@kafka.apache.org
> > Subject: Re: Embedding zookeeper and kafka in java process.
> >
> > Hi Subhash,
> >
> > This would happen if `null` is passed as the `threadNamePrefix` argument
> > when instantiating `KafkaServer`:
> >
> > class KafkaServer(val config: KafkaConfig, time: Time = SystemTime,
> > threadNamePrefix: Option[String] = None) extends Logging with
> > KafkaMetricsGroup
> >
> > How are you starting Kafka in your Java process?
> >
> > Ismael
> >
> > On Thu, Jun 16, 2016 at 1:26 AM, Subhash Agrawal 
> > wrote:
> >
> > > Thanks for quick response.
> > > I started zookeeper via zookeeper-server-start.bat and started kafka
> via
> > > my java process and I saw same error.
> > > But if I start zookeeper via java process and start kafka via
> > > kafka-server-start.bat, t works fine.
> > > It means it is not caused due to both getting started in same process.
> It
> > > must be some kafka specific issue.
> > >
> > > Subhash Agrawal
> > >
> > > -Original Message-
> > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > Sent: Wednesday, June 15, 2016 3:42 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: Embedding zookeeper and kafka in java process.
> > >
> > > It seems "scala.MatchError: null" are not related to the settings that
> ZK
> > > and Kafka is embedded in the same process, and the only case that I can
> > > think of related is this:
> > https://issues.apache.org/jira/browse/KAFKA-940.
> > >
> > > Could you clarify if you start these two services on two processes, the
> > > issue goes away?
> > >
> > > Guozhang
> > >
> > > On Wed, Jun 15, 2016 at 1:56 PM, Subhash Agrawal <
> agraw...@opentext.com>
> > > wrote:
> > >
> > > > Hi All,
> > > > I am embedding Kafka 0.10.0 and corresponding zookeeper in java
> > process.
> > > > In this process, I start zookeeper first and then wait for 10 seconds
> > and
> > > > then start kafka. These are all running in the same process. Toward
> the
> > > > end of kafka startup, I see following exception. It seems zookeeper
> is
> > > not
> > > > able
> > > > to add the newly created kafka instance. Have you seen this error
> > > > earlier?  I have only single node kafka.
> > > >
> > > > Let me know if you have any suggestions. I will really appreciate any
> > > help
> > > > on this.
> > > >
> > > > Thanks
> > > > Subhash Agrawal.
> > > >

Re: [ANNOUNCE] New committer: Ismael Juma

2016-04-26 Thread Achanta Vamsi Subhash
Congrats Ismael

On Tue, Apr 26, 2016 at 12:58 PM, Ismael Juma  wrote:

> Thank you Neha and the PMC for the opportunity. I am both grateful and
> excited. :)
>
> And thanks to everyone else in the Kafka community too.
>
> Ismael
>
> On Mon, Apr 25, 2016 at 10:52 PM, Neha Narkhede  wrote:
>
> > The PMC for Apache Kafka has invited Ismael Juma to join as a committer
> and
> > we are pleased to announce that he has accepted!
> >
> > Ismael has contributed 121 commits
> >  to a wide range of
> > areas, notably within the security and the network layer. His involvement
> > has been phenomenal across the board from mailing lists, JIRA, code
> reviews
> > and helping us move to GitHub pull requests to contributing features, bug
> > fixes and code and documentation improvements.
> >
> > Thank you for your contribution and welcome to Apache Kafka, Ismael!
> >
> > --
> > Thanks,
> > Neha
> >
>



-- 
Regards
Vamsi Subhash


Re: [GitHub] kafka pull request: Parallel log-recovery of un-flushed segments o...

2016-03-19 Thread Achanta Vamsi Subhash
Can some one please review this?

On Fri, Mar 11, 2016 at 12:09 AM, Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> Hi,
> I would like to make this into 0.0.10.0 so can someone look into this and
> review?
>
> On Wed, Mar 9, 2016 at 10:29 PM, Achanta Vamsi Subhash <
> achanta.va...@flipkart.com> wrote:
>
>> Hi all,
>>
>> https://github.com/apache/kafka/pull/1035
>> This pull request will make the log-segment load parallel with two
>> configurable properties "log.recovery.threads" and "
>> log.recovery.max.interval.ms".
>>
>> On startup, currently the log segments within a logDir are loaded
>> sequentially when there is a un-clean shutdown. This will take a lot of
>> time for the segments to be loaded as the logSegment.recover(..) is called
>> for every segment and for brokers which have many partitions, the time
>> taken will be very high (we have noticed ~40mins for 2k partitions).
>>
>> Logic:
>> 1. Have a threadpool defined of fixed length (log.recovery.threads)
>> 2. Submit the logSegment recovery as a job to the threadpool and add the
>> future returned to a job list
>> 3. Wait till all the jobs are done within req. time (
>> log.recovery.max.interval.ms - default set to Long.Max).
>> 4. If they are done and the futures are all null (meaning that the jobs
>> are successfully completed), it is considered done.
>> 5. If any of the recovery jobs failed, then it is logged and
>> LogRecoveryFailedException is thrown
>> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
>> The logic is backward compatible with the current sequential
>> implementation as the default thread count is set to 1.
>>
>> JIRA link is here:
>> https://issues.apache.org/jira/browse/KAFKA-3359
>>
>> Please review and give me suggestions. Will make them and contribute.
>> Thanks.
>>
>>
>> On Wed, Mar 9, 2016 at 7:57 PM, vamsi-subhash <g...@git.apache.org> wrote:
>>
>>> GitHub user vamsi-subhash opened a pull request:
>>>
>>> https://github.com/apache/kafka/pull/1035
>>>
>>> Parallel log-recovery of un-flushed segments on startup
>>>
>>> Did not find any tests for the method. Will be adding them
>>>
>>> You can merge this pull request into a Git repository by running:
>>>
>>> $ git pull https://github.com/vamsi-subhash/kafka trunk
>>>
>>> Alternatively you can review and apply these changes as the patch at:
>>>
>>> https://github.com/apache/kafka/pull/1035.patch
>>>
>>> To close this pull request, make a commit to your master/trunk branch
>>> with (at least) the following in the commit message:
>>>
>>> This closes #1035
>>>
>>> 
>>> commit ecab815203a2b6396703660d5a2f9d9bb00efcf3
>>> Author: Vamsi Subhash Achanta <vamsi...@gmail.com>
>>> Date:   2016-03-09T14:24:37Z
>>>
>>> Made log-recovery parallel
>>>
>>> 
>>>
>>>
>>> ---
>>> If your project is set up for it, you can reply to this email and have
>>> your
>>> reply appear on GitHub as well. If your project does not have this
>>> feature
>>> enabled and wishes so, or if the feature is enabled but not working,
>>> please
>>> contact infrastructure at infrastruct...@apache.org or file a JIRA
>>> ticket
>>> with INFRA.
>>> ---
>>>
>>
>>
>>
>> --
>> Regards
>> Vamsi Subhash
>>
>
>
>
> --
> Regards
> Vamsi Subhash
>



-- 
Regards
Vamsi Subhash


Re: [GitHub] kafka pull request: Parallel log-recovery of un-flushed segments o...

2016-03-10 Thread Achanta Vamsi Subhash
Hi,
I would like to make this into 0.0.10.0 so can someone look into this and
review?

On Wed, Mar 9, 2016 at 10:29 PM, Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> Hi all,
>
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two
> configurable properties "log.recovery.threads" and "
> log.recovery.max.interval.ms".
>
> On startup, currently the log segments within a logDir are loaded
> sequentially when there is a un-clean shutdown. This will take a lot of
> time for the segments to be loaded as the logSegment.recover(..) is called
> for every segment and for brokers which have many partitions, the time
> taken will be very high (we have noticed ~40mins for 2k partitions).
>
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the
> future returned to a job list
> 3. Wait till all the jobs are done within req. time (
> log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs
> are successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential
> implementation as the default thread count is set to 1.
>
> JIRA link is here:
> https://issues.apache.org/jira/browse/KAFKA-3359
>
> Please review and give me suggestions. Will make them and contribute.
> Thanks.
>
>
> On Wed, Mar 9, 2016 at 7:57 PM, vamsi-subhash <g...@git.apache.org> wrote:
>
>> GitHub user vamsi-subhash opened a pull request:
>>
>> https://github.com/apache/kafka/pull/1035
>>
>> Parallel log-recovery of un-flushed segments on startup
>>
>> Did not find any tests for the method. Will be adding them
>>
>> You can merge this pull request into a Git repository by running:
>>
>> $ git pull https://github.com/vamsi-subhash/kafka trunk
>>
>> Alternatively you can review and apply these changes as the patch at:
>>
>> https://github.com/apache/kafka/pull/1035.patch
>>
>> To close this pull request, make a commit to your master/trunk branch
>> with (at least) the following in the commit message:
>>
>> This closes #1035
>>
>> 
>> commit ecab815203a2b6396703660d5a2f9d9bb00efcf3
>> Author: Vamsi Subhash Achanta <vamsi...@gmail.com>
>> Date:   2016-03-09T14:24:37Z
>>
>> Made log-recovery parallel
>>
>> 
>>
>>
>> ---
>> If your project is set up for it, you can reply to this email and have
>> your
>> reply appear on GitHub as well. If your project does not have this feature
>> enabled and wishes so, or if the feature is enabled but not working,
>> please
>> contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
>> with INFRA.
>> ---
>>
>
>
>
> --
> Regards
> Vamsi Subhash
>



-- 
Regards
Vamsi Subhash


Re: [GitHub] kafka pull request: Parallel log-recovery of un-flushed segments o...

2016-03-09 Thread Achanta Vamsi Subhash
Hi all,

https://github.com/apache/kafka/pull/1035
This pull request will make the log-segment load parallel with two
configurable properties "log.recovery.threads" and "
log.recovery.max.interval.ms".

On startup, currently the log segments within a logDir are loaded
sequentially when there is a un-clean shutdown. This will take a lot of
time for the segments to be loaded as the logSegment.recover(..) is called
for every segment and for brokers which have many partitions, the time
taken will be very high (we have noticed ~40mins for 2k partitions).

Logic:
1. Have a threadpool defined of fixed length (log.recovery.threads)
2. Submit the logSegment recovery as a job to the threadpool and add the
future returned to a job list
3. Wait till all the jobs are done within req. time (
log.recovery.max.interval.ms - default set to Long.Max).
4. If they are done and the futures are all null (meaning that the jobs are
successfully completed), it is considered done.
5. If any of the recovery jobs failed, then it is logged and
LogRecoveryFailedException is thrown
6. If the timeout is reached, LogRecoveryFailedException is thrown.
The logic is backward compatible with the current sequential implementation
as the default thread count is set to 1.

JIRA link is here:
https://issues.apache.org/jira/browse/KAFKA-3359

Please review and give me suggestions. Will make them and contribute.
Thanks.


On Wed, Mar 9, 2016 at 7:57 PM, vamsi-subhash  wrote:

> GitHub user vamsi-subhash opened a pull request:
>
> https://github.com/apache/kafka/pull/1035
>
> Parallel log-recovery of un-flushed segments on startup
>
> Did not find any tests for the method. Will be adding them
>
> You can merge this pull request into a Git repository by running:
>
> $ git pull https://github.com/vamsi-subhash/kafka trunk
>
> Alternatively you can review and apply these changes as the patch at:
>
> https://github.com/apache/kafka/pull/1035.patch
>
> To close this pull request, make a commit to your master/trunk branch
> with (at least) the following in the commit message:
>
> This closes #1035
>
> 
> commit ecab815203a2b6396703660d5a2f9d9bb00efcf3
> Author: Vamsi Subhash Achanta 
> Date:   2016-03-09T14:24:37Z
>
> Made log-recovery parallel
>
> 
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
> with INFRA.
> ---
>



-- 
Regards
Vamsi Subhash


Re: Consumer Assignment documentation - StreamPartitionAssignment.java

2016-02-05 Thread Achanta Vamsi Subhash
Thanks Guozhang.

Is the below true?

1. There is no central controller for the consuemrs which assigns the
partitions to the active consumers.
2. Hence, the consumers each fight for the allocation by registering the
watch on active consumers (/ids path in zk).

But how do the consumers finally arrive at concensus? Who ensures that the
consumer who took ownership actually is consuming data? We are noticing
scenarios where the rebalances are taking a lot of time and we have
(backoff-time * rebalance-retries) > zk-session-timeout.

Would it be better to have a controller pattern followed by the consumers?
1. Have a consumer controller which is leader elected
2. The consumers then register their active state in zk
3. The controller allocates the consumers the partitions depending on
strategies - ex: ceil(no-of-partitions/active-consumers)
4. There is a background thread at the controller to monitor if the
allocated consumer is actually consuming the messages out of the partition
and if not, re-allocates them to others



On Tue, Feb 2, 2016 at 10:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Vamsi,
>
> Consumer partition assignment is always within a single group. You can find
> the java docs for the PartitionAssignor interface here:
>
>
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/RangeAssignor.html
>
> StreamPartitionAssignor is one implementation of the interface which is
> used exclusively in Kafka Streams.
>
> Guozhang
>
> On Tue, Feb 2, 2016 at 1:37 AM, Achanta Vamsi Subhash <
> achanta.va...@flipkart.com> wrote:
>
> > Hi all,
> >
> > I am going through the partition assignment logic in the Kafka new
> > consumer. Is there any documentation for the logic implemented? It would
> be
> > great if some one can write the algorithm for consumer partition
> assignment
> > (within a group and across groups).
> >
> > --
> > Regards
> > Vamsi Subhash
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards
Vamsi Subhash


Consumer Assignment documentation - StreamPartitionAssignment.java

2016-02-02 Thread Achanta Vamsi Subhash
Hi all,

I am going through the partition assignment logic in the Kafka new
consumer. Is there any documentation for the logic implemented? It would be
great if some one can write the algorithm for consumer partition assignment
(within a group and across groups).

-- 
Regards
Vamsi Subhash