Re: Migrating from 07.1 .100

2016-06-13 Thread Gerard Klijs
There has been a lot of changes, and it can also be quit a challenge to get
the SSL working. And with only a null-pointer, there is little to go on. I
would first focus on getting it to work with 0.10 (if possible also on a
clustered test setup), and if it all works, try to configure the ssl.

On Mon, Jun 13, 2016 at 9:08 PM Subhash Agrawal 
wrote:

> Hi,
> I currently embed kafka 0.7.1 in my java process. To support SSL, we have
> decided to upgrade Kafka to 0.10.0.
> After upgrade, I am seeing following error during kafka startup.
>
> java.lang.NullPointerException
> at kafka.utils.Throttler.(Throttler.scala:45)
> at kafka.log.LogCleaner.(LogCleaner.scala:75)
> at kafka.log.LogManager.(LogManager.scala:66)
> at
> kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:183)
>
> Has anybody seen this error? If I run kafka externally, then I don't see
> this error.
>
> Thanks
> Subhash A.
>
>
> From: Subhash Agrawal
> Sent: Wednesday, June 08, 2016 12:40 PM
> To: 'us...@kafka.apache.org'
> Subject: Migrating from 07.1 .100
>
> Hi,
> I am currently using Kafka 0.7.1 without zookeeper. We have single node
> kafka server.
> To enhance security, we have decided to support SSL. As 0.7.1 version does
> not support SSL,
> we are upgrading to latest version 0.10.0.0. We noticed that with the
> latest version, it is
> mandatory to use zookeeper.
>
> Is there any way I can use Kafka 0.10 or 0.9 version without zookeeper?
>
> Thanks
> Subhash A.
>
>


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-06-13 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328866#comment-15328866
 ] 

Jason Gustafson commented on KAFKA-2857:


[~vahid] That's a good question. Maybe we can get clarification from [~ijuma] 
or [~junrao], but the previous sentence in the description suggests maybe the 
intent is to distinguish the case of a group not existing? I think it would be 
fine here to give the user a message which tells them that the offsets topic is 
unavailable and points out that it takes a little time on a new cluster to 
create it. But if we can't find the coordinator, then there's really no way to 
know whether the group "exists" or not.

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Ishita Mandhan
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant

2016-06-13 Thread radha (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328781#comment-15328781
 ] 

radha commented on KAFKA-3686:
--

We have a similar issue and even with a very very large retry, the producer 
decides to skip messages when there is a network issue. In addition to what the 
OP asked, at a minimum, there should be a way to get a handle of the message 
after the last retry or after everything fails? 

The only option in an Async send is a callback on completion where even the 
recordmetadata is empty as expected because there was no server communication 
but what about the record itself and number of retries that happened?

Error producing to topic 
org.apache.kafka.common.errors.TimeoutException: Batch Expired

reconnect.backoff.ms = 100
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 3
connections.max.idle.ms = 54
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
request.timeout.ms = 5000
acks = 1
batch.size = 16384
receive.buffer.bytes = 32768
retries = 1000
max.request.size = 1048576
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
linger.ms = 10

> Kafka producer is not fault tolerant
> 
>
> Key: KAFKA-3686
> URL: https://issues.apache.org/jira/browse/KAFKA-3686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Luca Bruno
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 
> 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>  Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i)));
>  f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say 
> k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that 
> there are 3 kafka servers (as I understood it), and the leaders of the 
> partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each 
> partition. If the leader happens to be k1 for a message, the producer raises 
> the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>   at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages 
> are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin 
> schema for the partition, I would say it's not very important which partition 
> the message is being sent to.
> I expect the client library to handle the failure by sending the message to a 
> partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the 
> main client libraries being used for kafka as far as I know, I find it a 
> serious problem in terms of fault tolerance.
> EDIT: I cannot add comments to this issue, don't understand why. To answer 
> [~fpj] yes, I want the first. In the case of network partitions I want to 
> ensure my messages are stored. If the libraries don't do that, it means I 
> have to reimplement them. Or otherwise, postpone sending such messages until 
> the network partition resolves (which means implementing some kind of backlog 
> on disk of the producer, which should instead be the kafka purpose after 
> all). In both cases, it's something that is not documented and it's very 
> inconvenient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-13 Thread Grant Henke
Thanks for the review Jun.

You probably want to make it clearer if timeout > 0, what waiting for topic
> metadata is "complete" means. In the first implementation, it really means
> that the topic metadata is propagated to the controller's metadata cache.


I updated the wiki to be more descriptive. Below is the updated text:

Setting a timeout > 0 will allow the request to block until the topic
> metadata is "complete" on the controller node.
>
>- Complete means the local topic metadata cache been completely
>populated and all partitions have leaders
>   - The topic metadata is updated when the controller sends out
>   update metadata requests to the brokers
>- If a timeout error occurs, the topic could still be created
>successfully at a later time. Its up to the client to query for the state
>at that point.
>
>
Thanks,
Grant


On Sun, Jun 12, 2016 at 4:14 PM, Jun Rao  wrote:

> Grant,
>
> Thanks for the proposal. It looks good to me.
>
> You probably want to make it clearer if timeout > 0, what waiting for topic
> metadata is "complete" means. In the first implementation, it really means
> that the topic metadata is propagated to the controller's metadata cache.
>
> Jun
>
> On Fri, Jun 10, 2016 at 9:21 AM, Grant Henke  wrote:
>
> > Now that Kafka 0.10 has been released I would like to start work on the
> new
> > protocol messages and client implementation for KIP-4. In order to break
> up
> > the discussion and feedback I would like to continue breaking up the
> > content in to smaller pieces.
> >
> > This discussion thread is for the CreateTopic request/response and server
> > side implementation. Details for this implementation can be read here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
> >
> > I have included the exact content below for clarity:
> >
> > > Create Topic Request (KAFKA-2945
> > > )
> > >
> > >
> > > CreateTopic Request (Version: 0) => [create_topic_requests] timeout
> > >   create_topic_requests => topic partitions replication_factor
> > [replica_assignment] [configs]
> > > topic => STRING
> > > partitions => INT32
> > > replication_factor => INT32
> > > replica_assignment => partition_id [replicas]
> > >   partition_id => INT32
> > >   replicas => INT32
> > > configs => config_key config_value
> > >   config_key => STRING
> > >   config_value => STRING
> > >   timeout => INT32
> > >
> > > CreateTopicRequest is a batch request to initiate topic creation with
> > > either predefined or automatic replica assignment and optionally topic
> > > configuration.
> > >
> > > Request semantics:
> > >
> > >1. Must be sent to the controller broker
> > >2. Multiple instructions for the same topic in one request will be
> > >silently ignored, only the last from the list will be executed.
> > >   - This is because the list of topics is modeled server side as a
> > >   map with TopicName as the key
> > >3. The principle must be authorized to the "Create" Operation on the
> > >"Cluster" resource to create topics.
> > >   - Unauthorized requests will receive a
> > ClusterAuthorizationException
> > >4.
> > >
> > >Only one from ReplicaAssignment or (Partitions + ReplicationFactor),
> > can
> > >be defined in one instruction. If both parameters are specified -
> > >ReplicaAssignment takes precedence.
> > >- In the case ReplicaAssignment is defined number of partitions and
> > >   replicas will be calculated from the supplied ReplicaAssignment.
> > >   - In the case of defined (Partitions + ReplicationFactor) replica
> > >   assignment will be automatically generated by the server.
> > >   - One or the other must be defined. The existing broker side auto
> > >   create defaults will not be used
> > >   (default.replication.factor, num.partitions). The client
> > implementation can
> > >   have defaults for these options when generating the messages.
> > >5. Setting a timeout > 0 will allow the request to block until the
> > >topic metadata is "complete" on the controller node.
> > >   - Complete means the topic metadata has been completely populated
> > >   (leaders, replicas, ISRs)
> > >   - If a timeout error occurs, the topic could still be created
> > >   successfully at a later time. Its up to the client to query for
> > the state
> > >   at that point.
> > >6. The request is not transactional.
> > >   1. If an error occurs on one topic, the other could still be
> > >   created.
> > >   2. Errors are reported independently.
> > >
> > > QA:
> > >
> > >- Why is CreateTopicRequest a batch request?
> > >   - Scenarios where tools or admins want to create many topics
> 

Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-13 Thread Grant Henke
Thanks for the review Gwen.

1. The replica assignment protocol takes [replicas], there is the
> implicit assumption that the first replica is the leader. This matches
> current behavior elsewhere, but lets document it explicitly.


I added this to the wiki and will update the protocol doc string in the
patch.

2. I like the timeout, but want to clarify why, since it may not be
> obvious to everyone:


I tried to describe why a timeout, even if not global, is useful in
the "Cluster
Consistent Blocking
"
section. I have a QA that links to that section in the Create Topic section
(Fixed the broken link). Below is the relevant text from that section:

The intermediate changes in KIP-4 should allow an easy transition to
> "complete blocking" when the work can be done. This is supported by
> providing optional local blocking in the mean time. This local blocking
> only blocks until the local state on the controller is correct. We will
> still provide a polling mechanism for users that do not want to block at
> all. A polling mechanism is required in the optimal implementation too
> because users still need a way to check state after a timeout occurs
> because operations like "create topic" are not transactional. Local
> blocking has the added benefit of avoiding wasted poll requests to other
> brokers when its impossible for the request to be completed. If the
> controllers state is not correct, then the other brokers cant be either.
> Clients who don't want to validate the entire cluster state is correct can
> block on the controller and avoid polling all together with reasonable
> confidence that though they may get a retriable error on follow up
> requests, the requested change was successful and the cluster will be
> accurate eventually.

Because we already add a timeout field to the requests wire protocols,
> changing the behavior to block until the cluster is consistent in the
> future would not require a protocol change. Though the version could be
> bumped to indicate a behavior change.


Thanks,
Grant


On Fri, Jun 10, 2016 at 12:34 PM, Gwen Shapira  wrote:

> Thank you for the clear proposal, Grant!
>
> I like the request/response objects and the timeout semantics. Two
> comments:
>
> 1. The replica assignment protocol takes [replicas], there is the
> implicit assumption that the first replica is the leader. This matches
> current behavior elsewhere, but lets document it explicitly.
>
> 2. I like the timeout, but want to clarify why, since it may not be
> obvious to everyone:
> Currently, the response is sent when the controller has sent the
> "update metadata" request to the brokers involved with the new topic.
> It is a rather weak guarantee, but if clients decide to poll the
> brokers for updates, it does reduce the time spent polling.
> More important, this behavior is net improvement on current state
> (completely async and ZK dependent) and when we do have a way to get
> "ack" from replicas, we will be able to add the new behavior without
> changing the protocol (just the semantics of waiting).
>
> Gwen
>
> On Fri, Jun 10, 2016 at 7:21 PM, Grant Henke  wrote:
> > Now that Kafka 0.10 has been released I would like to start work on the
> new
> > protocol messages and client implementation for KIP-4. In order to break
> up
> > the discussion and feedback I would like to continue breaking up the
> > content in to smaller pieces.
> >
> > This discussion thread is for the CreateTopic request/response and server
> > side implementation. Details for this implementation can be read here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
> >
> > I have included the exact content below for clarity:
> >
> >> Create Topic Request (KAFKA-2945
> >> )
> >>
> >>
> >> CreateTopic Request (Version: 0) => [create_topic_requests] timeout
> >>   create_topic_requests => topic partitions replication_factor
> [replica_assignment] [configs]
> >> topic => STRING
> >> partitions => INT32
> >> replication_factor => INT32
> >> replica_assignment => partition_id [replicas]
> >>   partition_id => INT32
> >>   replicas => INT32
> >> configs => config_key config_value
> >>   config_key => STRING
> >>   config_value => STRING
> >>   timeout => INT32
> >>
> >> CreateTopicRequest is a batch request to initiate topic creation with
> >> either predefined or automatic replica assignment and optionally topic
> >> configuration.
> >>
> >> Request semantics:
> >>
> >>1. Must be sent to the controller broker
> >>2. Multiple instructions for the same topic in one request 

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-13 Thread Guozhang Wang
Although this KIP is not mainly for memory management of Kafka Streams,
since it touches on quite some part of it I think it is good to first think
of what we would REALLY want as an end goal for memory usage in order to
make sure that whatever we proposed in this KIP aligns with that long-term
plan. So I wrote up this discussion page that summarized my current
thoughts:

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams

As for its implication on this KIP, my personal take is that:

1. we should use a global config in terms of bytes, which will then be
evenly divided among the threads within the Kafka Streams instance, but
within a thread that config can be used to control the total size of all
caches instead of further dividing that among all caches.

2. instead of caching in terms of deserialized objects we may need to
consider just caching in terms of serialized bytes; admittedly it will
incur costs of doing serdes for caching, but without doing so I honestly
have no concrete clue how we can measure the current memory usage
accurately AND efficiently (after reading the links Ismael sent me I feel
the accurate estimates for collection types / composite types like String
will do some serialize with sun.misc.Unsafe anyways when it uses reflection
to crawl the object graph) although we may need to do some benchmarking
with https://github.com/jbellis/jamm, for example to validate this claim or
someone tell me that there is actually a better way that I'm not aware of..


Guozhang


On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax 
wrote:

> I am just catching up on this thread.
>
> From my point of view, easy tuning for the user is the most important
> thing, because Kafka Streams is a library. Thus, a global cache size
> parameter should be the best.
>
> About dividing the memory vs a single global cache. I would argue that
> in the first place dividing the memory would be good, as synchronization
> might kill the performance. About the cache sizes, I was thinking about
> starting with an even distribution and adjust the individual cache sizes
> during runtime.
>
> The dynamic adjustment can also be added later on. We need to figure out
> a good internal monitoring and "cost function" to determine which task
> needs more memory and which less. Some metrics to do this might be
> number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc.
>
> I have to confess, that I have no idea right now, how to design the
> "cost function" to compute the memory size for each task. But if we want
> to add dynamic memory management later on, it might be a good idea to
> keep it in mind and align this KIP already for future improvements.
>
> -Matthias
>
>
> On 06/09/2016 05:24 AM, Henry Cai wrote:
> > One more thing for this KIP:
> >
> > Currently RocksDBWindowStore serialize the key/value before it puts into
> > the in-memory cache, I think we should delay this
> > serialization/deserialization unless it needs flush to db.  For a simple
> > countByKey for 100 records, this would trigger 100
> > serialization/deserialization even if everything is in-memory.
> >
> > If we move this internal cache from RocksDBStore to a global place, I
> hope
> > we can reduces the time it needs to do the serialization.
> >
> >
> > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma  wrote:
> >
> >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang 
> wrote:
> >>>
> >>> About using Instrumentation.getObjectSize, yeah we were worried a lot
> >> about
> >>> its efficiency as well as accuracy when discussing internally, but not
> a
> >>> better solution was proposed. So if people have better ideas, please
> >> throw
> >>> them here, as it is also the purpose for us to call out such KIP
> >> discussion
> >>> threads.
> >>>
> >>
> >> Note that this requires a Java agent to be configured. A few links:
> >>
> >>
> >>
> https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
> >>
> >>
> https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java
> >> https://github.com/jbellis/jamm
> >> http://openjdk.java.net/projects/code-tools/jol/
> >> https://github.com/DimitrisAndreou/memory-measurer
> >>
> >> OK, maybe that's more than what you wanted. :)
> >>
> >> Ismael
> >>
> >
>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-13 Thread Grant Henke
Hi Jay,

Good point one of the main benefits of the create topic api is removing the
server side auto create. The work is noted in the Follow Up Changes

section
of the KIP-4 wiki and tacked by KAFKA-2410
.

You are pretty much spot on with the plan. But there are some things that
would need to be discussed during that change that likely deserve their own
KIP. I will lay out some of my thoughts below. However, do you mind if we
defer the rest of the discussion until after the create topics patch is
done? (I am happy drive that KIP as soon as this patch is in)

High level plan:

   1. We set auto.create.topics.enable=false by default on the server and
   deprecate it
   2. We add a few new producer configs
  1. auto.create.topics.enable (should document that privileges are
  required if using ACLs)
  2. auto.create.topics.partitions
  3. auto.create.topics.replicas
   3. The producer tracks the location of the controller which it now gets
   in the metadata response and includes this in its internal cluster metadata
   representation
   4. The producer uses the create topic api to make a request to
   the controller when it gets an error about a non-existent topic

I mocked up a quick implementation based off of my create topics PR to vet
the basics. But there are a still some open questions and things to test.

The strawman implementation does the following:

   - Updates the metadata with the controller information
   - Set "topicsToBeCreated" in the metadata in NetworkClient.handleResponse
  - topics are added when receiving an UNKNOWN_TOPIC_OR_PARTITION error
   - Sends CreateTopicRequests to the controller
   in NetworkClient.maybeUpdate
  - This effectively makes create topic part of the metadata updates

Some of the things that need to be thought through include:

   - Should auto.create.topics.replicas scale down to the number of known
   live servers at create time?
   - Should the consumer be able to auto create topics too?
   - What happens when both client are broker side auto create are enabled?
  - I think the broker wins in this case since metadata request happens
  first
   - What happens when the user is unauthorized to create topics?
  - Either throw exception or wait for metadata update timeout

Thanks,
Grant

On Sun, Jun 12, 2016 at 2:08 PM, Jay Kreps  wrote:

> Hey Grant,
>
> Great to see this progressing. That API looks good to me. Thanks for the
> thoughtful write-up.
>
> One thing that would be great to add to this KIP would be a quick sketch of
> how the create topic api can be used to get rid of the thing where we
> create topics when you ask for their metadata. This doesn't need to be in
> any great depth, just enough to make sure this new api will work for that
> use case.
>
> I think the plan is something like
>
>1. We set auto.create.topics.enable=false by default on the server and
>deprecate it
>2. We add a new producer config auto.create.topics.enable
>3. The producer tracks the location of the controller which it now gets
>in the metadata response and includes this in its internal cluster
> metadata
>representation
>4. The producer uses the create topic api to make a request to the
>controller when it gets an error about a non-existent topic
>
> I think the semantics of this at first will be the same as they are now--if
> retries are disabled the first produce request will potentially fail if the
> topic creation hasn't quite completed. This isn't great but it isn't worse
> than the current state and I think would be fixed either by future
> improvements to make the requests fully blocking or by idempotence for the
> producer (which would mean retries were always enabled).
>
> One thing I'm not sure of is whether the admin java api, which would
> maintain its own connection pool etc, would be used internally by the
> producer (and potentially consumer) or if they would just reuse the request
> objects.
>
> Just trying to write this down to sanity check that it will work.
>
> -Jay
>
> On Fri, Jun 10, 2016 at 9:21 AM, Grant Henke  wrote:
>
> > Now that Kafka 0.10 has been released I would like to start work on the
> new
> > protocol messages and client implementation for KIP-4. In order to break
> up
> > the discussion and feedback I would like to continue breaking up the
> > content in to smaller pieces.
> >
> > This discussion thread is for the CreateTopic request/response and server
> > side implementation. Details for this implementation can be read here:
> >
> >
> 

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-13 Thread Jason Gustafson
Hey Becket,

Sorry for the late response. I agree there's a little more to think through
on the implementation. The offset commit is the tricky one since we could
execute a user-provided callback. I'm thinking if there is an inflight
request to the coordinator, we may simply skip the heartbeat and try again
after a short backoff. Probably the only other request that could compete
with the heartbeat is an offset commit, and this would actually be fine
since the coordinator treats offset commits as effective heartbeats.
Anyway, unless you think this problem is serious enough for more
discussion, I'm going to go ahead and start a vote in the next day or two.

Thanks,
Jason

On Mon, Jun 6, 2016 at 2:15 PM, Becket Qin  wrote:

> Guozhang and Jason,
>
> I think we are on the same page that having rebalances done in the
> background thread has a much bigger impact to the users. So I agree that is
> is probably better to start with having 1) and 2). We can add 3) later if
> necessary.
>
> Another implementation detail I am not quite sure is about making the
> NetworkClient work with two threads. The KIP implies that this will be done
> by synchronizing on ConsumerNetworkClient. I am not sure if that is enough,
> what if a poll() from ConsumerNetworkClient receives a FetchResponse or
> OffsetFetchResponse which are supposed to be handled by user thread? This
> is implementation detail but may be worth thinking about a bit more.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Jun 6, 2016 at 11:27 AM, Guozhang Wang  wrote:
>
> > Jiangjie:
> >
> > About doing the rebalance in the background thread, I'm a bit concerned
> as
> > it will change a lot of the concurrency guarantees that consumer
> currently
> > provides (think of a consumer caller thread committing externally while
> the
> > rebalance is happening in the background thread), and hence if we are
> > considering changing that now or in the future, we need to think through
> > all the corner cases.
> >
> > So in general, I'd still prefer we reserve a third config for rebalance
> > timeout in this KIP.
> >
> > Guozhang
> >
> >
> > On Mon, Jun 6, 2016 at 11:25 AM, Guozhang Wang 
> wrote:
> >
> > > (+ Matthias)
> > >
> > > Hello Henry,
> > >
> > > Specifically to your question regarding Kafka Streams:
> > >
> > > 1. Currently restoreActiveState() is triggered in the
> onPartitionAssigned
> > > callback, which is after the rebalance is completed from the
> > coordinator's
> > > point of view, and hence is covered in the process timeout value in
> this
> > > new KIP.
> > >
> > > 2. That is a good question, and I think it is a general root cause we
> saw
> > > failures of directory locking reported by more than one use case
> already.
> > > Currently I believe the main reason that a second rebalance is
> triggered
> > > while the processors are still completing restoreActiveState() of the
> > > previous rebalance is due to session timeout (default 30 seconds),
> which
> > > will be largely reduced with a larger processor timeout; however with
> > > complex topologies we restoreActiveState() for all states may still be
> > > taking long time with tens / hundreds of state stores, and other cases
> > > that also can cause consumers to re-join the groups right after a
> > previous
> > > rebalance, for example 1) regex subscription where the topic metadata
> has
> > > changed, 2) consecutive consumer failures, or new consumers (i.e. new
> > > KStream instances / threads) added.
> > >
> > > For such cases we can do a better job to "fail fast" if the consumer
> > > detects another join is needed. I think in one of your local commit you
> > > are already doing sth similar, which we can merge back to trunk.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 5, 2016 at 11:24 PM, Henry Cai  >
> > > wrote:
> > >
> > >> I have a question on the KIP on long stall during
> > >> ProcessorStateManager.restoreActiveState(), this can be a long stall
> > when
> > >> we need to rebuild the RocksDB state on a new node.
> > >>
> > >> 1. Is restoreActiveState() considered as post rebalance since this is
> > >> invoked on application rebalance listener?
> > >> 2. When the node A was spending long time rebuilding the state in
> > >> restoreActiveState() from the previous rebalance, a new node (node B)
> > send
> > >> a new JoinGroup request to the co-ordinator, how long should the
> > >> coordinator wait for node A to finish the restoreActiveState from the
> > >> previous rebalance, the restoreActiveState can take more than 10
> minutes
> > >> for a big state.
> > >>
> > >>
> > >> On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin 
> > wrote:
> > >>
> > >> > Hi Jason,
> > >> >
> > >> > Thanks for this very useful KIP.  In general I am with Guozhang on
> the
> > >> > purpose of of the three timeouts.
> > >> > 1) session timeout for consumer liveness,
> > >> > 2) process 

[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-06-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328708#comment-15328708
 ] 

Vahid Hashemian commented on KAFKA-2857:


[~hachikuji] Could you also clarify what constitutes the "coordinator doesn't 
exist" case? The JIRA asks that it is distinguished from when "coordinator is 
not available". Thanks.

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Ishita Mandhan
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1502: MINOR: Clarify the at least once delivery with aut...

2016-06-13 Thread Ishiihara
GitHub user Ishiihara opened a pull request:

https://github.com/apache/kafka/pull/1502

MINOR: Clarify the at least once delivery with auto commit enabled.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ishiihara/kafka fetcher-comments

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1502






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3209) Support single message transforms in Kafka Connect

2016-06-13 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328647#comment-15328647
 ] 

Ewen Cheslack-Postava commented on KAFKA-3209:
--

[~pablocasares] I don't think anybody is actively working on this -- Nisarg had 
started but I'm not sure if they have dropped work on it for now since we 
haven't seen anything for awhile. First step would be to sketch out a design 
and put together a KIP. We'll want to make sure the use cases and how the 
proposed interfaces work for them is clearly documented. Especially important 
will be discussion around configuration -- how will transforms be configured in 
the Connector configs, possibly allowing for multiple transformations while 
keeping configuration easy? Additionally, if we are going to ship anything with 
Kafka to provide very commonly needed transformations (dropping & renaming 
fields, flattening hierarchical structures, filters), we'll probably want to 
include a full description of that set in the KIP as well. (Alternatively, we 
might not ship any, but then they obviously aren't as useful out of the box -- 
like connectors and converters, you'll need to pull in separate jars for 
transforms). I imagine the implementation will be pretty straightforward once 
we've got it specced out.

> Support single message transforms in Kafka Connect
> --
>
> Key: KAFKA-3209
> URL: https://issues.apache.org/jira/browse/KAFKA-3209
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Neha Narkhede
>
> Users should be able to perform light transformations on messages between a 
> connector and Kafka. This is needed because some transformations must be 
> performed before the data hits Kafka (e.g. filtering certain types of events 
> or PII filtering). It's also useful for very light, single-message 
> modifications that are easier to perform inline with the data import/export.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk8 #693

2016-06-13 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #1501: MINOR: Expose window store sequence number

2016-06-13 Thread theduderog
GitHub user theduderog opened a pull request:

https://github.com/apache/kafka/pull/1501

MINOR: Expose window store sequence number

@guozhangwang @mjsax @enothereska 

Currently, Kafka Streams does not have a util to get access to the sequence 
number added to the key of windows state store changelogs.  I'm interested in 
exposing it so the the contents of a changelog topic can be 1) inspected for 
debugging purposes and 2) saved to text file and loaded from text file

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/theduderog/kafka expose-seq-num

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1501.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1501


commit 2dc4badeae5aa8220b469f40c418afa1455f05ee
Author: Roger Hoover 
Date:   2016-06-13T22:53:28Z

Exposed window store sequence number




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-13 Thread Matthias J. Sax
I am just catching up on this thread.

From my point of view, easy tuning for the user is the most important
thing, because Kafka Streams is a library. Thus, a global cache size
parameter should be the best.

About dividing the memory vs a single global cache. I would argue that
in the first place dividing the memory would be good, as synchronization
might kill the performance. About the cache sizes, I was thinking about
starting with an even distribution and adjust the individual cache sizes
during runtime.

The dynamic adjustment can also be added later on. We need to figure out
a good internal monitoring and "cost function" to determine which task
needs more memory and which less. Some metrics to do this might be
number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc.

I have to confess, that I have no idea right now, how to design the
"cost function" to compute the memory size for each task. But if we want
to add dynamic memory management later on, it might be a good idea to
keep it in mind and align this KIP already for future improvements.

-Matthias


On 06/09/2016 05:24 AM, Henry Cai wrote:
> One more thing for this KIP:
> 
> Currently RocksDBWindowStore serialize the key/value before it puts into
> the in-memory cache, I think we should delay this
> serialization/deserialization unless it needs flush to db.  For a simple
> countByKey for 100 records, this would trigger 100
> serialization/deserialization even if everything is in-memory.
> 
> If we move this internal cache from RocksDBStore to a global place, I hope
> we can reduces the time it needs to do the serialization.
> 
> 
> On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma  wrote:
> 
>> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang  wrote:
>>>
>>> About using Instrumentation.getObjectSize, yeah we were worried a lot
>> about
>>> its efficiency as well as accuracy when discussing internally, but not a
>>> better solution was proposed. So if people have better ideas, please
>> throw
>>> them here, as it is also the purpose for us to call out such KIP
>> discussion
>>> threads.
>>>
>>
>> Note that this requires a Java agent to be configured. A few links:
>>
>>
>> https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
>>
>> https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java
>> https://github.com/jbellis/jamm
>> http://openjdk.java.net/projects/code-tools/jol/
>> https://github.com/DimitrisAndreou/memory-measurer
>>
>> OK, maybe that's more than what you wanted. :)
>>
>> Ismael
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-33 - Add a time based log index

2016-06-13 Thread Guozhang Wang
Thanks Jiangjie,

I see the need for sensitive data purging, the above proposed change LGTM.
One minor concern is that a wrongly marked timestamp on the first record
could cause the segment to roll much later / earlier, though it may be rare.

Guozhang

On Fri, Jun 10, 2016 at 10:07 AM, Becket Qin  wrote:

> Hi,
>
> During the implementation of KIP-33, we found it might be useful to have a
> more deterministic time based log rolling than what proposed in the KIP.
>
> The current KIP proposal uses the largest timestamp in the segment for time
> based rolling. The active log segment only rolls when there is no message
> appended in max.roll.ms since the largest timestamp in the segment. i.e.
> the rolling time may change if user keeping appending messages into the
> segment. This may not be a desirable behavior for people who have sensitive
> data and want to make sure they are removed after some time.
>
> To solve the above issue, we want to modify the KIP proposal regarding the
> time based rolling to the following behavior. The time based log rolling
> will be based on the first message with a timestamp in the log segment if
> there is such a message. If no message in the segment has a timestamp, the
> time based log rolling will still be based on log segment create time,
> which is the same as we are doing now. The reasons we don't want to always
> roll based on file create time are because 1) the message timestamp may be
> assigned by clients which can be different from the create time of the log
> segment file. 2) On some Linux, the file create time is not available, so
> using segment file create time may not always work.
>
> Do people have any concern for this change? I will update the KIP if people
> think the change is OK.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Apr 19, 2016 at 6:27 PM, Becket Qin  wrote:
>
> > Thanks Joel and Ismael. I just updated the KIP based on your feedback.
> >
> > KIP-33 has passed with +4 (binding) and +2 (non-binding)
> >
> > Thanks everyone for the reading, feedback and voting!
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Apr 19, 2016 at 5:25 PM, Ismael Juma  wrote:
> >
> >> Thanks Becket. I think it would be nice to update the KIP with regards
> to
> >> point 3 and 4.
> >>
> >> In any case, +1 (non-binding)
> >>
> >> Ismael
> >>
> >> On Tue, Apr 19, 2016 at 2:03 AM, Becket Qin 
> wrote:
> >>
> >> > Thanks for the comments Ismael. Please see the replies inline.
> >> >
> >> > On Mon, Apr 18, 2016 at 6:50 AM, Ismael Juma 
> wrote:
> >> >
> >> > > Hi Jiangjie,
> >> > >
> >> > > Thanks for the KIP, it's a nice improvement. Since it seems like we
> >> have
> >> > > been using the voting thread for discussion, I'll do the same.
> >> > >
> >> > > A few minor comments/questions:
> >> > >
> >> > > 1. The proposed name for the time index file
> >> > `SegmentBaseOffset.timeindex`.
> >> > > Would `SegmentBaseOffset.time-index` be a little better? It would
> >> clearly
> >> > > separate the type of index in case we add additional index types in
> >> the
> >> > > future.
> >> > >
> >> > I have no strong opinion on this, I am not adding any thing separator
> >> > because it is more regex friendly.
> >> > I am not sure about the other indexes, time and space seems to be two
> >> most
> >> > common dimensions.
> >> >
> >> > 2. When describing the time index entry, we say "Offset - the next
> >> offset
> >> > > when the time index entry is inserted". I found the mention of
> `next`
> >> a
> >> > bit
> >> > > confusing as it looks to me like the time index entry has the first
> >> > offset
> >> > > in the message set.
> >> >
> >> > This semantic meaning is a little different from the offset index. The
> >> > offset index information is self-contained by nature. i.e. all the
> >> offsets
> >> > before is smaller than the offset of this message set. So we only need
> >> to
> >> > say "the offset of this message set is OFFSET". This does not quite
> >> apply
> >> > to the time index because the max timestamp may or may not be in the
> >> > message set being appended. So we have to either say, "the max
> timestamp
> >> > before I append this message set is T", or "the max timestamp after I
> >> > appended this message set is T". The former case means that we can
> skip
> >> all
> >> > the previous messages if we are looking for a timestamp > T and start
> >> from
> >> > this offset. The latter one means if we are searching for timestamp >
> >> T, we
> >> > should start after this message set, which is essentially the same as
> >> the
> >> > former case but require an additional interpretation.
> >> >
> >> > 3. We say "The default initial / max size of the time index files is
> the
> >> > > same as the offset index files. (time index entry is 1.5x of the
> size
> >> of
> >> > > offset index entry, user should set the configuration accordingly)".
> >> It
> >> > may
> >> > > be 

[jira] [Commented] (KAFKA-3786) Avoid unused property from parent configs causing WARN entries

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328287#comment-15328287
 ] 

ASF GitHub Bot commented on KAFKA-3786:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1465


> Avoid unused property from parent configs causing WARN entries
> --
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.10.1.0
>
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1465: KAFKA-3786: Let ConfigDef filter property key valu...

2016-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1465


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3786) Avoid unused property from parent configs causing WARN entries

2016-06-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3786.
--
   Resolution: Fixed
Fix Version/s: 0.10.1.0

Issue resolved by pull request 1465
[https://github.com/apache/kafka/pull/1465]

> Avoid unused property from parent configs causing WARN entries
> --
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.10.1.0
>
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3576) Unify KStream and KTable API

2016-06-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328242#comment-15328242
 ] 

Matthias J. Sax commented on KAFKA-3576:


I see two points:
(1) this is similar to SQL / Pig or Spark DSL (this was also the motivation for 
[KAFKA-3337])
(2) user ofter forgot the {{through()}} in 
{{stream.selectKey(...).through(...).aggregateByKey(...)}} which is a 
no-intuitive operation
(2a) even if [KAFKA-3561] tackles the {{through}} problem, an explicit 
{{groupBy}} makes the re-distribution overhead explicit

> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1500: MINOR: Fix javadoc typos in ConsumerRebalanceListe...

2016-06-13 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1500

MINOR: Fix javadoc typos in ConsumerRebalanceListener



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka 
typo07/fix_javadoc_typos_consumerrebalancelistener

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1500


commit b1548d2dc0bafe4513922790756b7d60dc101201
Author: Vahid Hashemian 
Date:   2016-06-13T20:02:46Z

MINOR: Fix Javadoc Typos in ConsumerRebalanceListener




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


RE: Migrating from 07.1 .100

2016-06-13 Thread Subhash Agrawal
Hi,
I currently embed kafka 0.7.1 in my java process. To support SSL, we have 
decided to upgrade Kafka to 0.10.0.
After upgrade, I am seeing following error during kafka startup.

java.lang.NullPointerException
at kafka.utils.Throttler.(Throttler.scala:45)
at kafka.log.LogCleaner.(LogCleaner.scala:75)
at kafka.log.LogManager.(LogManager.scala:66)
at 
kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)
at kafka.server.KafkaServer.startup(KafkaServer.scala:183)

Has anybody seen this error? If I run kafka externally, then I don't see this 
error.

Thanks
Subhash A.


From: Subhash Agrawal
Sent: Wednesday, June 08, 2016 12:40 PM
To: 'us...@kafka.apache.org'
Subject: Migrating from 07.1 .100

Hi,
I am currently using Kafka 0.7.1 without zookeeper. We have single node kafka 
server.
To enhance security, we have decided to support SSL. As 0.7.1 version does not 
support SSL,
we are upgrading to latest version 0.10.0.0. We noticed that with the latest 
version, it is
mandatory to use zookeeper.

Is there any way I can use Kafka 0.10 or 0.9 version without zookeeper?

Thanks
Subhash A.



[jira] [Updated] (KAFKA-3831) Preparation for updating the default partition assignment strategy of Mirror Maker to round robin

2016-06-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3831:
---
Status: Patch Available  (was: Open)

> Preparation for updating the default partition assignment strategy of Mirror 
> Maker to round robin
> -
>
> Key: KAFKA-3831
> URL: https://issues.apache.org/jira/browse/KAFKA-3831
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Add proper warnings and make necessary doc changes for updating the default 
> partition assignment strategy of Mirror Maker from range to round robin. The 
> actual switch would occur as part of a major release cycle (to be scheduled).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3831) Preparation for updating the default partition assignment strategy of Mirror Maker to round robin

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327770#comment-15327770
 ] 

ASF GitHub Bot commented on KAFKA-3831:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1499

KAFKA-3831: Prepare for updating Mirror Maker's default partition 
assignment strategy to round robin

This patch adds proper warning message and necessary doc updates for 
updating the default partition assignment strategy of Mirror Maker from range 
to round robin. The actual switch would occur as part of a major release cycle 
(to be scheduled).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3831

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1499


commit 4ee0fd612f0e34a8a218de237fb869904b49af7e
Author: Vahid Hashemian 
Date:   2016-06-13T17:08:49Z

KAFKA-3831: Prepare for updating Mirror Maker's default partition 
assignment strategy to round robin

This patch adds proper warning message and necessary doc updates for 
updating the default partition assignment strategy of Mirror Maker from range 
to round robin. The actual switch would occur as part of a major release cycle 
(to be scheduled).




> Preparation for updating the default partition assignment strategy of Mirror 
> Maker to round robin
> -
>
> Key: KAFKA-3831
> URL: https://issues.apache.org/jira/browse/KAFKA-3831
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Add proper warnings and make necessary doc changes for updating the default 
> partition assignment strategy of Mirror Maker from range to round robin. The 
> actual switch would occur as part of a major release cycle (to be scheduled).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1499: KAFKA-3831: Prepare for updating Mirror Maker's de...

2016-06-13 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1499

KAFKA-3831: Prepare for updating Mirror Maker's default partition 
assignment strategy to round robin

This patch adds proper warning message and necessary doc updates for 
updating the default partition assignment strategy of Mirror Maker from range 
to round robin. The actual switch would occur as part of a major release cycle 
(to be scheduled).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3831

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1499


commit 4ee0fd612f0e34a8a218de237fb869904b49af7e
Author: Vahid Hashemian 
Date:   2016-06-13T17:08:49Z

KAFKA-3831: Prepare for updating Mirror Maker's default partition 
assignment strategy to round robin

This patch adds proper warning message and necessary doc updates for 
updating the default partition assignment strategy of Mirror Maker from range 
to round robin. The actual switch would occur as part of a major release cycle 
(to be scheduled).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3834) Consumer should not block in poll on coordinator discovery

2016-06-13 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-3834:
--

 Summary: Consumer should not block in poll on coordinator discovery
 Key: KAFKA-3834
 URL: https://issues.apache.org/jira/browse/KAFKA-3834
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Currently we block indefinitely in poll() when discovering the coordinator for 
the group. Instead, we can return an empty record set when the passed timeout 
expires. The downside is that it may obscure the underlying problem (which is 
usually misconfiguration), but users typically have to look at the logs to 
figure out the problem anyway. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3833) Feature Request: allow Kafka clients to retrieve Kafka broker versions

2016-06-13 Thread Hari Sekhon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hari Sekhon updated KAFKA-3833:
---
Description: 
Feature request for the ability of Kafka clients to query the Kafka brokers for 
their versions.

This is an important check if you're writing code to support multiple Kafka 
versions and your test suite is supposed to check against the different Kafka 
versions then you want to be able to query them to ensure you're testing the 
right versions, rather than some left over Docker container of the wrong 
version or a Docker image with a version typo in the source Dockerfile.

This is related to but a simpler ask than KAFKA-3304.

  was:
Feature request for the ability of Kafka clients to query the Kafka brokers for 
their versions.

This is an important check if you're writing code to support multiple Kafka 
versions and your test suite is supposed to check against the different Kafka 
versions then you want to be able to query them to ensure you're testing the 
right versions, rather than some left over Docker container of the wrong 
version or a Docker image with a version typo in the source Dockerfile.


> Feature Request: allow Kafka clients to retrieve Kafka broker versions
> --
>
> Key: KAFKA-3833
> URL: https://issues.apache.org/jira/browse/KAFKA-3833
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, producer 
>Reporter: Hari Sekhon
>Assignee: Neha Narkhede
>Priority: Minor
>
> Feature request for the ability of Kafka clients to query the Kafka brokers 
> for their versions.
> This is an important check if you're writing code to support multiple Kafka 
> versions and your test suite is supposed to check against the different Kafka 
> versions then you want to be able to query them to ensure you're testing the 
> right versions, rather than some left over Docker container of the wrong 
> version or a Docker image with a version typo in the source Dockerfile.
> This is related to but a simpler ask than KAFKA-3304.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3833) Feature Request: allow Kafka clients to retrieve Kafka broker versions

2016-06-13 Thread Hari Sekhon (JIRA)
Hari Sekhon created KAFKA-3833:
--

 Summary: Feature Request: allow Kafka clients to retrieve Kafka 
broker versions
 Key: KAFKA-3833
 URL: https://issues.apache.org/jira/browse/KAFKA-3833
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, producer 
Reporter: Hari Sekhon
Assignee: Neha Narkhede
Priority: Minor


Feature request for the ability of Kafka clients to query the Kafka brokers for 
their versions.

This is an important check if you're writing code to support multiple Kafka 
versions and your test suite is supposed to check against the different Kafka 
versions then you want to be able to query them to ensure you're testing the 
right versions, rather than some left over Docker container of the wrong 
version or a Docker image with a version typo in the source Dockerfile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2016-06-13 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-3832:


 Summary: Kafka Connect's JSON Converter never outputs a null value
 Key: KAFKA-3832
 URL: https://issues.apache.org/jira/browse/KAFKA-3832
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.9.0.1
Reporter: Randall Hauch
Assignee: Ewen Cheslack-Postava


Kafka Connect's JSON Converter will never output a null value when 
{{enableSchemas=true}}. This means that when a connector outputs a 
{{SourceRecord}} with a null value, the JSON Converter will always produce a 
message value with:

{code:json}
{
  "schema": null,
  "payload": null
}
{code}

And, this means that while Kafka log compaction will always be able to remove 
earlier messages with the same key, log compaction will never remove _all_ of 
the messages with the same key. 

The JSON Connector's {{fromConnectData(...)}} should always return null when it 
is supplied a null value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3831) Preparation for updating the default partition assignment strategy of Mirror Maker to round robin

2016-06-13 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-3831:
--

 Summary: Preparation for updating the default partition assignment 
strategy of Mirror Maker to round robin
 Key: KAFKA-3831
 URL: https://issues.apache.org/jira/browse/KAFKA-3831
 Project: Kafka
  Issue Type: Improvement
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


Add proper warnings and make necessary doc changes for updating the default 
partition assignment strategy of Mirror Maker from range to round robin. The 
actual switch would occur as part of a major release cycle (to be scheduled).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1464) Add a throttling option to the Kafka replication tool

2016-06-13 Thread Ralph Weires (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327561#comment-15327561
 ] 

Ralph Weires commented on KAFKA-1464:
-

We have similar problems as described by Jason above, in our case usually when 
taking a broker offline due to hardware failure (broken HD, with each broker 
being equipped with 2 HDs / log directories in our case). If the broker gets 
back online with one fresh disk and corresponding missing data (i.e. half of 
the partitions of that broker missing), its network link is saturated for some 
time by inbound traffic to catch up with replication.

While the broker is re-streaming all the missing data, we are additionally 
experiencing problems with consumers as well. After the broker has caught up 
with it's missing data, the situation normalizes again quickly.

To me it seems as if the partitions for which the broker already catches up 
soon after restart (esp. the ones from non-broken HD which just had little data 
missing) are causing issues if the broker becomes leader for them, while it is 
otherwise still clogging its incoming link with replication of the remaining 
data.

In this scenario, I would actually prefer to just let the broker catch up with 
any replication it still needs to do, without it becoming leader for any 
partition it has. Isn't there actually a way to achieve this? I.e. just keeping 
a broker online with replication and all, but not having it take over any 
partition leadership (at least so long as there are other candidates available 
for leadership). Being able to toggle that behavior at run-time would be ideal, 
so that we would just explicitly activate it again after the maintenance 
interval, once the node has caught up the bulk of necessary replication. Could 
IMO be an alternative to any throttling approach.

> Add a throttling option to the Kafka replication tool
> -
>
> Key: KAFKA-1464
> URL: https://issues.apache.org/jira/browse/KAFKA-1464
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: mjuarez
>Assignee: Ismael Juma
>Priority: Minor
>  Labels: replication, replication-tools
> Fix For: 0.10.1.0
>
>
> When performing replication on new nodes of a Kafka cluster, the replication 
> process will use all available resources to replicate as fast as possible.  
> This causes performance issues (mostly disk IO and sometimes network 
> bandwidth) when doing this in a production environment, in which you're 
> trying to serve downstream applications, at the same time you're performing 
> maintenance on the Kafka cluster.
> An option to throttle the replication to a specific rate (in either MB/s or 
> activities/second) would help production systems to better handle maintenance 
> tasks while still serving downstream applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3818:
---
Status: Open  (was: Patch Available)

> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Joseph Aliase (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327513#comment-15327513
 ] 

Joseph Aliase commented on KAFKA-3828:
--

Sure will test and let you know

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327509#comment-15327509
 ] 

Ismael Juma commented on KAFKA-3828:


Yes, upgrading the consumer to 0.9.0.1 or 0.10.0.0. The latter is preferred if 
all your brokers are 0.10.0.0 as well. Otherwise, 0.9.0.1 will have to do.

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Joseph Aliase (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327488#comment-15327488
 ] 

Joseph Aliase commented on KAFKA-3828:
--

I assume you are asking to upgrade to Consumer version 0.9.0.1 and 0.10.0.1

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327472#comment-15327472
 ] 

Ismael Juma commented on KAFKA-3828:


Thanks. There are known consumer bugs in 0.9.0.0 with similar symptoms. Would 
you be able to upgrade to 0.9.0.1 or 0.10.0.0?

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Joseph Aliase (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327462#comment-15327462
 ] 

Joseph Aliase commented on KAFKA-3828:
--

Broker version: 0.10.0.0 and 0.9.0.1

Consumer Version: 0.9.0.0

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-13 Thread Dustin Cote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327390#comment-15327390
 ] 

Dustin Cote commented on KAFKA-2394:


[~ewencp] yeah I think that's for the best.  I wouldn't want to surprise anyone 
expecting the log naming scheme to stay the same with a change a minor version. 
 If you think that's too conservative of an outlook, I'll defer to your 
judgement :)

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dustin Cote
>Priority: Minor
>  Labels: newbie
> Fix For: 0.11.0.0
>
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3830) getTGT() debug logging exposes confidential information

2016-06-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3830:
---
Status: Patch Available  (was: Open)

> getTGT() debug logging exposes confidential information
> ---
>
> Key: KAFKA-3830
> URL: https://issues.apache.org/jira/browse/KAFKA-3830
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.0.1
>
>
> We have the same issue as the one described in ZOOKEEPER-2405.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3830) getTGT() debug logging exposes confidential information

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327353#comment-15327353
 ] 

ASF GitHub Bot commented on KAFKA-3830:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1498

KAFKA-3830; getTGT() debug logging exposes confidential information

Only log the client and server principals, which is what ZooKeeper does 
after ZOOKEEPER-2405.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3830-get-tgt-debug-confidential

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1498


commit aac856f5a3a64123f798788b31707521e9157519
Author: Ismael Juma 
Date:   2016-06-13T13:23:04Z

getTGT() debug logging exposes confidential information




> getTGT() debug logging exposes confidential information
> ---
>
> Key: KAFKA-3830
> URL: https://issues.apache.org/jira/browse/KAFKA-3830
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.0.1
>
>
> We have the same issue as the one described in ZOOKEEPER-2405.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1498: KAFKA-3830; getTGT() debug logging exposes confide...

2016-06-13 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1498

KAFKA-3830; getTGT() debug logging exposes confidential information

Only log the client and server principals, which is what ZooKeeper does 
after ZOOKEEPER-2405.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3830-get-tgt-debug-confidential

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1498


commit aac856f5a3a64123f798788b31707521e9157519
Author: Ismael Juma 
Date:   2016-06-13T13:23:04Z

getTGT() debug logging exposes confidential information




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3576) Unify KStream and KTable API

2016-06-13 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327335#comment-15327335
 ] 

Eno Thereska commented on KAFKA-3576:
-

I understand the goal, but why is the first option (from KTable) necessarily 
better than the second one?

> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327326#comment-15327326
 ] 

Matthias J. Sax commented on KAFKA-3775:


[~tenggyut] [~wushujames] Kafka Streams allow to specify Consumer and Producer 
configurations the same way as for the regular Java consumer/producer via 
{{StreamConfig}} (some restrictions apply, as Kafka Streams does for example 
not allow to enable "auto.commit"). But using the quota feature should work.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3830) getTGT() debug logging exposes confidential information

2016-06-13 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-3830:
--

 Summary: getTGT() debug logging exposes confidential information
 Key: KAFKA-3830
 URL: https://issues.apache.org/jira/browse/KAFKA-3830
 Project: Kafka
  Issue Type: Bug
  Components: security
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 0.10.0.1


We have the same issue as the one described in ZOOKEEPER-2405.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-13 Thread Rajini Sivaram
I have updated KIP-55 to reflect the changes from the discussions in the
voting thread (
https://www.mail-archive.com/dev@kafka.apache.org/msg51610.html).

Jun/Gwen,

Existing client-id quotas will be used as default client-id quotas for
users when no user quotas are configured - i.e., default user quota is
unlimited and no user-specific quota override is specified. This enables
user rate limits to be configured for ANONYMOUS if required in a cluster
that has both PLAINTEXT and SSL/SASL. By default, without any user rate
limits set, rate limits for client-ids will apply, retaining the current
client-id quota configuration for single-user clusters.

Zookeeper will have two paths /clients with client-id quotas that apply
only when user quota is unlimited similar to now. And /users which persists
user quotas for any user including ANONYMOUS.

Comments and feedback are appreciated.

Regards,

Rajini


On Wed, Jun 8, 2016 at 9:00 PM, Rajini Sivaram  wrote:

> Jun,
>
> Oops, sorry, I hadn't realized that the last note was on the discuss
> thread. Thank you for pointing it out. I have sent another note for voting.
>
>
> On Wed, Jun 8, 2016 at 4:30 PM, Jun Rao  wrote:
>
>> Rajini,
>>
>> Perhaps it will be clearer if you start the voting in a new thread (with
>> VOTE in the subject).
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Jun 7, 2016 at 1:55 PM, Rajini Sivaram <
>> rajinisiva...@googlemail.com
>> > wrote:
>>
>> > I would like to initiate the vote for KIP-55.
>> >
>> > The KIP details are here: KIP-55: Secure quotas for authenticated users
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
>> > >
>> > .
>> >
>> > The JIRA  KAFKA-3492  > > >has
>> > a draft PR here: https://github.com/apache/kafka/pull/1256.
>> >
>> > Thank you...
>> >
>> >
>> > Regards,
>> >
>> > Rajini
>> >
>>
>
>
>
> --
> Regards,
>
> Rajini
>



-- 
Regards,

Rajini


[jira] [Commented] (KAFKA-3209) Support single message transforms in Kafka Connect

2016-06-13 Thread Pablo Casares (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327225#comment-15327225
 ] 

Pablo Casares commented on KAFKA-3209:
--

Hey, I'm interested in produce keys based on json fields. I was reading 
https://groups.google.com/forum/#!topic/confluent-platform/aVaqBtMiKkY and they 
suggest this improvement. I think that this can help me. I can contribute to 
this improvement if you want. What is the current status of this task?.

Regards! 

Pablo

> Support single message transforms in Kafka Connect
> --
>
> Key: KAFKA-3209
> URL: https://issues.apache.org/jira/browse/KAFKA-3209
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Neha Narkhede
>
> Users should be able to perform light transformations on messages between a 
> connector and Kafka. This is needed because some transformations must be 
> performed before the data hits Kafka (e.g. filtering certain types of events 
> or PII filtering). It's also useful for very light, single-message 
> modifications that are easier to perform inline with the data import/export.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-13 Thread Rajini Sivaram
Thank you all for the feedback. Closing this voting thread to continue
discussions on the updated KIP in the discuss thread.

On Sat, Jun 11, 2016 at 7:45 AM, Gwen Shapira  wrote:

> I'd also like to see clarification regarding the ZK structures.
> Currently they appear as if user-quotas and client-quotas are
> equivalent, but this will obviously need to change.
>
>
> On Sat, Jun 11, 2016 at 1:28 AM, Jun Rao  wrote:
> > Rajini,
> >
> > The new proposal sounds good to me too. My only question is what happens
> to
> > those existing quotas on client-id. Do we just treat them as the quota
> for
> > that client-id under ANONYMOUS user name?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jun 10, 2016 at 2:43 PM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> >> Jay,
> >>
> >> Thank you for the quick feedback. It shouldn't be too hard since I had
> a PR
> >> earlier along these lines anyway.
> >>
> >> Jun, are you ok with this approach? If everyone agrees, I will close
> this
> >> vote, update the KIP and give some more time for discussions.
> >>
> >>
> >> On Fri, Jun 10, 2016 at 10:27 PM, Jay Kreps  wrote:
> >>
> >> > This sounds a lot better to me--hopefully it isn't too much harder! I
> do
> >> > think if it is possible to do this directly that will be better for
> users
> >> > than having an intermediate step since we always have to work through
> >> > migrating people who have setup quotas already from the old way to the
> >> new
> >> > way.
> >> >
> >> > -Jay
> >> >
> >> > On Fri, Jun 10, 2016 at 2:12 PM, Rajini Sivaram <
> >> > rajinisiva...@googlemail.com> wrote:
> >> >
> >> > > I do think client-id is a valid and useful grouping for quotas even
> in
> >> > > secure clusters - but only if clientA of user1 is treated as a
> >> different
> >> > > client-id from clientA of user2. Grouping of clients of a user
> enables
> >> > > users to allocate their quota effectively to their clients (eg.
> >> guarantee
> >> > > that critical event processing clients are not throttled by auditing
> >> > > clients). When the KIP was down-sized to support only user-based
> >> quotas,
> >> > I
> >> > > was hoping that we could extend it at a later time to enable
> >> hierarchical
> >> > > quotas. But I understand that it can be confusing to switch the
> >> semantics
> >> > > of quotas based on modes set in the brokers. So let me try once
> again
> >> to
> >> > > promote the original KIP-55. At the time, I did have a flag to
> revert
> >> to
> >> > > the existing client-id behavior to maintain compatibility. But
> perhaps
> >> > that
> >> > > is not necessary.
> >> > >
> >> > > How does this sound?
> >> > >
> >> > >- Quotas may be configured for users. Sub-quotas may be
> configured
> >> for
> >> > >client-ids of a user. Quotas may also be configured for
> client-ids
> >> of
> >> > > users
> >> > >with unlimited quota (Long.MaxValue).
> >> > >- Users who don't have a quota override are allocated a
> configurable
> >> > >default quota.
> >> > >- Client-ids without a sub-quota override share the remainder of
> the
> >> > >user quota if the user has a quota limit. Default quotas may be
> >> > defined
> >> > > for
> >> > >clients of users with unlimited quota.
> >> > >- For an insecure or single-user secure cluster, the existing
> >> > client-id
> >> > >based quota semantics can be achieved by configuring unlimited
> quota
> >> > for
> >> > >the user and sub-quota configuration for client-id that matches
> the
> >> > > current
> >> > >client-id quota configuration.
> >> > >- For a cluster mixes both secure and insecure access, client-id
> >> > quotas
> >> > >can be set for unauthenticated clients (unlimited quota for
> >> ANONYMOUS,
> >> > >quotas for client-ids) and user quotas can be set for
> authenticated
> >> > > users.
> >> > >- In a multi-user cluster, it is currently possible to define
> quotas
> >> > for
> >> > >client-ids that span multiple users. This will no longer be
> >> supported.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Jun 10, 2016 at 6:43 PM, Gwen Shapira 
> >> wrote:
> >> > >
> >> > > > I am not crazy about modes either. An earlier proposal supported
> both
> >> > > > client-ids and users at the same time, and it made more sense to
> me.
> >> I
> >> > > > believe it was dropped without proper discussion (Basically, Jun
> >> > > > mentioned it is complex and Rajini agreed to drop it). We should
> >> > > > probably rethink the complexity of supporting both vs the
> limitations
> >> > > > of "modes".
> >> > > >
> >> > > > As you said, we will have secure clients authenticating with users
> >> and
> >> > > > insecure clients authenticating with client-ids at the same time.
> >> > > >
> >> > > > On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps 
> wrote:
> >> > > > > Hey Rajini,
> >> > > > >
> >> > > > > 1. That makes sense to me. Is 

Re: Kafka Streaming - Window expiration

2016-06-13 Thread Pariksheet Barapatre
Many Thanks Eno.  I will try .until method.

Cheers
Pari

On 13 June 2016 at 14:10, Eno Thereska  wrote:

> Hi Pari,
>
> Try the .until method like this:
>
> > (TimeWindows) TimeWindows.of("tumbling-window-example",
> windowSizeMs).until(60 * 1000L)
>
> Thanks
> Eno
>
>
> > On 13 Jun 2016, at 08:31, Pariksheet Barapatre 
> wrote:
> >
> > Hello Experts,
> >
> > As per documentation in kafka docs -
> > *Windowing* is a common prerequisite for stateful transformations which
> > group records in a stream, for example, by their timestamps. A local
> state
> > store is usually needed for a windowing operation to store recently
> > received records based on the window interval, while old records in the
> > store are purged after the specified window retention period
> > <
> http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
> >
> > .
> >
> > But I am not finding any example to set window retention period example.
> >
> > Could somebody help me with example?
> >
> > long windowSizeMs = 60 *
> > 1000L;TimeWindows.of("tumbling-window-example", windowSizeMs);
> >
> > How to set expiration/purge time for above window example.
> >
> > Many Thanks
> >
> > Pari
>
>


[jira] [Commented] (KAFKA-3828) Consumer thread stalls after consumer re balance for some partition

2016-06-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327066#comment-15327066
 ] 

Ismael Juma commented on KAFKA-3828:


Can you please include the consumer and broker version?

> Consumer thread stalls after consumer re balance for some partition 
> 
>
> Key: KAFKA-3828
> URL: https://issues.apache.org/jira/browse/KAFKA-3828
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Operating System : CentOS release 6.4
> Kafka Cluster: Stand alone cluster with one broker and one zookeeper.
>Reporter: Joseph Aliase
>Assignee: Neha Narkhede
>
> In process of testing the new Kafka Consumer API we came across this issue. 
> We started single broker Kafka Cluster with broker listening on port 9092 and 
> zookeeper on 2181.
> We created a topic test with partition 6. We started a consumer with below 
> configuration:
> bootstrap.servers= host-name:9092
> group.id=consumer-group
> key.deserializer=StringDeserializer.class.getName()
> value.deserializer=StringDeserializer.class.getName()
> session.timeout.ms=3
> heartbeat.interval.ms=1
> We started producing data into topic test:
> sh kafka-producer-perf-test.sh --topic test --num-records 100 
> --record-size 10 --throughput 500 --producer-props 
> bootstrap.servers=localhost:9092
> Consumer instance is started with 6 threads to consume data from 6 partition. 
> We then restart another consumer instance with 6 threads. Consumer re-balance 
> occurs and 6 partitions is divided equally among this two instance.
> Then we start another consumer instance with 6 threads again we could see 
> re-balance occurring with partition getting divided among three consumer 
> instance. Everything works well.
> Then if we stop one consumer instance and partitions get re-balanced between 
> two instance. 
> If we stop and restart the another running instances and repeat the steps for 
> few time we could see the issue occurring where we could see Consumer is 
> holding the partition's but not consuming any data from that partition. 
> Partition data remain unconsumed until we stop the consumer instance which is 
> holding the partition. 
> We were not able to reproduce this issue we publish data to topic at very low 
> rate however issue could be easily reproduced when data is being published at 
> high rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3829) Warn that kafka-connect group.id must not conflice with connector names

2016-06-13 Thread Barry Kaplan (JIRA)
Barry Kaplan created KAFKA-3829:
---

 Summary: Warn that kafka-connect group.id must not conflice with 
connector names
 Key: KAFKA-3829
 URL: https://issues.apache.org/jira/browse/KAFKA-3829
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.9.0.1
Reporter: Barry Kaplan
Assignee: Ewen Cheslack-Postava
Priority: Minor


If the group.id value happens to have the same value as a connector names the 
following error will be issued:

{quote}
Attempt to join group connect-elasticsearch-indexer failed due to: The group 
member's supported protocols are incompatible with those of existing members.
{quote}

Maybe the documentation for Distributed Worker Configuration group.id could be 
worded:

{quote}
A unique string that identifies the Connect cluster group this worker belongs 
to. This value must be different than all connector configuration 'name' 
properties.
{quote}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Streaming - Window expiration

2016-06-13 Thread Eno Thereska
Hi Pari,

Try the .until method like this:

> (TimeWindows) TimeWindows.of("tumbling-window-example", 
> windowSizeMs).until(60 * 1000L)

Thanks
Eno


> On 13 Jun 2016, at 08:31, Pariksheet Barapatre  wrote:
> 
> Hello Experts,
> 
> As per documentation in kafka docs -
> *Windowing* is a common prerequisite for stateful transformations which
> group records in a stream, for example, by their timestamps. A local state
> store is usually needed for a windowing operation to store recently
> received records based on the window interval, while old records in the
> store are purged after the specified window retention period
> 
> .
> 
> But I am not finding any example to set window retention period example.
> 
> Could somebody help me with example?
> 
> long windowSizeMs = 60 *
> 1000L;TimeWindows.of("tumbling-window-example", windowSizeMs);
> 
> How to set expiration/purge time for above window example.
> 
> Many Thanks
> 
> Pari



[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-13 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326937#comment-15326937
 ] 

James Cheng commented on KAFKA-3775:


[~kawamuray], for the network traffic, could you use Kafka's quota feature? 
http://kafka.apache.org/documentation.html#design_quotas Kafka lets you specify 
how much network bandwidth a given client-id is allowed to use. If Kafka 
Streams lets you specify a client id, then this would let you limit how much 
network traffic your application uses.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka Streaming - Window expiration

2016-06-13 Thread Pariksheet Barapatre
Hello Experts,

As per documentation in kafka docs -
*Windowing* is a common prerequisite for stateful transformations which
group records in a stream, for example, by their timestamps. A local state
store is usually needed for a windowing operation to store recently
received records based on the window interval, while old records in the
store are purged after the specified window retention period

.

But I am not finding any example to set window retention period example.

Could somebody help me with example?

long windowSizeMs = 60 *
1000L;TimeWindows.of("tumbling-window-example", windowSizeMs);

How to set expiration/purge time for above window example.

Many Thanks

Pari