Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-12 Thread Dong Lin
Thanks for the update Lucas.

I think the motivation section is intuitive. It will be good to learn more
about the comments from other reviewers.

On Thu, Jul 12, 2018 at 9:48 PM, Lucas Wang  wrote:

> Hi Dong,
>
> I've updated the motivation section of the KIP by explaining the cases that
> would have user impacts.
> Please take a look at let me know your comments.
>
> Thanks,
> Lucas
>
> On Mon, Jul 9, 2018 at 5:53 PM, Lucas Wang  wrote:
>
> > Hi Dong,
> >
> > The simulation of disk being slow is merely for me to easily construct a
> > testing scenario
> > with a backlog of produce requests. In production, other than the disk
> > being slow, a backlog of
> > produce requests may also be caused by high produce QPS.
> > In that case, we may not want to kill the broker and that's when this KIP
> > can be useful, both for JBOD
> > and non-JBOD setup.
> >
> > Going back to your previous question about each ProduceRequest covering
> 20
> > partitions that are randomly
> > distributed, let's say a LeaderAndIsr request is enqueued that tries to
> > switch the current broker, say broker0, from leader to follower
> > *for one of the partitions*, say *test-0*. For the sake of argument,
> > let's also assume the other brokers, say broker1, have *stopped* fetching
> > from
> > the current broker, i.e. broker0.
> > 1. If the enqueued produce requests have acks =  -1 (ALL)
> >   1.1 without this KIP, the ProduceRequests ahead of LeaderAndISR will be
> > put into the purgatory,
> > and since they'll never be replicated to other brokers (because
> of
> > the assumption made above), they will
> > be completed either when the LeaderAndISR request is processed or
> > when the timeout happens.
> >   1.2 With this KIP, broker0 will immediately transition the partition
> > test-0 to become a follower,
> > after the current broker sees the replication of the remaining 19
> > partitions, it can send a response indicating that
> > it's no longer the leader for the "test-0".
> >   To see the latency difference between 1.1 and 1.2, let's say there are
> > 24K produce requests ahead of the LeaderAndISR, and there are 8 io
> threads,
> >   so each io thread will process approximately 3000 produce requests. Now
> > let's investigate the io thread that finally processed the LeaderAndISR.
> >   For the 3000 produce requests, if we model the time when their
> remaining
> > 19 partitions catch up as t0, t1, ...t2999, and the LeaderAndISR request
> is
> > processed at time t3000.
> >   Without this KIP, the 1st produce request would have waited an extra
> > t3000 - t0 time in the purgatory, the 2nd an extra time of t3000 - t1,
> etc.
> >   Roughly speaking, the latency difference is bigger for the earlier
> > produce requests than for the later ones. For the same reason, the more
> > ProduceRequests queued
> >   before the LeaderAndISR, the bigger benefit we get (capped by the
> > produce timeout).
> > 2. If the enqueued produce requests have acks=0 or acks=1
> >   There will be no latency differences in this case, but
> >   2.1 without this KIP, the records of partition test-0 in the
> > ProduceRequests ahead of the LeaderAndISR will be appended to the local
> log,
> > and eventually be truncated after processing the LeaderAndISR.
> > This is what's referred to as
> > "some unofficial definition of data loss in terms of messages
> > beyond the high watermark".
> >   2.2 with this KIP, we can mitigate the effect since if the LeaderAndISR
> > is immediately processed, the response to producers will have
> > the NotLeaderForPartition error, causing producers to retry
> >
> > This explanation above is the benefit for reducing the latency of a
> broker
> > becoming the follower,
> > closely related is reducing the latency of a broker becoming the leader.
> > In this case, the benefit is even more obvious, if other brokers have
> > resigned leadership, and the
> > current broker should take leadership. Any delay in processing the
> > LeaderAndISR will be perceived
> > by clients as unavailability. In extreme cases, this can cause failed
> > produce requests if the retries are
> > exhausted.
> >
> > Another two types of controller requests are UpdateMetadata and
> > StopReplica, which I'll briefly discuss as follows:
> > For UpdateMetadata requests, delayed processing means clients receiving
> > stale metadata, e.g. with the wrong leadership info
> > for certain partitions, and the effect is more retries or even fatal
> > failure if the retries are exhausted.
> >
> > For StopReplica requests, a long queuing time may degrade the performance
> > of topic deletion.
> >
> > Regarding your last question of the delay for DescribeLogDirsRequest, you
> > are right
> > that this KIP cannot help with the latency in getting the log dirs info,
> > and it's only relevant
> > when controller requests are involved.
> >
> > Regards,
> > Lucas
> >
> >
> > On Tue, Jul 3, 2018 at 5:11 PM, Dong Lin  wrote:
> 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-12 Thread Lucas Wang
Hi Jun,

About 3, thanks for the clarification. I like your proposal in that it
avoids the delay for controller requests when the data request queue is
empty.
In comparison, the approach I described earlier is simpler to understand
and implement.

Between these two I actually like your suggested approach better because
in some cases the data request queue becoming empty can be a common scenario
if the request handler average idle percent is high. Therefore the extra
complexity is worth
the effort.

Either approach we choose, it does not affect public interfaces or the
write up in the KIP,
and we can discuss further in the PR.

Thanks,
Lucas



On Wed, Jul 11, 2018 at 8:46 AM, Jun Rao  wrote:

> Hi, Lucas,
>
> 2. Good point about not knowing the request type in memory pool. Looking at
> the implementation. It seems that queued.max.request.bytes is orthogonal to
> queued.max.requests. So, this seems fine.
>
> 3. The implementation that you suggested sounds good. It would be useful
> not to unnecessarily delay the processing of a request up to 300ms. I was
> thinking that we could have RequestChannel manage a Lock and a couple of
> Conditions and have sendRequest()/receiveRequest() coordinate on the lock
> and the conditions (similar to how ArrayBlockingQueue is implemented). This
> way, any new request can wake up the blocked request handling threads
> immediately.
>
> Thanks,
>
> Jun
>
>
> On Fri, Jun 29, 2018 at 4:53 PM, Lucas Wang  wrote:
>
> > Hi Jun,
> >
> > Thanks for your comments.
> > 1. I just replied in the discussion thread about the positive change this
> > KIP can still bring
> > if implemented on the latest trunk, which includes the async ZK
> operations
> > for KAFKA-5642.
> > The evaluation is done using an integration test.
> > In production, we have not upgraded to Kafka 1.1 yet, and the code we are
> > currently running does
> > not include async ZK operations, therefore I don't have any real usage
> > result.
> >
> > 2. Thanks for bringing this up. I haven't considered this setting, and
> the
> > existing proposal in this KIP
> > would make data requests and controller requests share a memory poll of
> > size specified by the config
> > queued.max.request.bytes. The downside is that if there is memory
> pressure,
> > controller requests may be blocked
> > from being read from a socket and does not get prioritized at the socket
> > layer.
> >
> > If we have a separate bytes limit for the controller requests, I imagine
> > there would be a separate memory pool
> > dedicated to controller requests. Also it requires the processors to tell
> > connections from a controller apart
> > from connections from other brokers or clients, which would probably
> > require a dedicated port for the controller?
> > IMO, this change is mainly driven by the memory pressure, kind of an
> > orthogonal issue, and we can address it with a separate KIP
> > if desired. Please let me know what you think.
> >
> > 3. I plans to change the implementation of the method
> > receiveRequest(timeout: Long) in the RequestChannel class as follows:
> >
> > val controllerRequest = controllerRequestQueue.poll()
> > if (controllerRequest != null) {
> >   controllerRequest
> > } else {
> >   dataRequestQueue.poll(timeout, TimeUnit.MILLISECONDS)
> > }
> >
> > with this implementation, there is no need to explicitly choose a request
> > handler thread to wake up depending on
> > the types of request enqueued, and if a controller request arrives while
> > some request handler threads are blocked on an empty data request queue,
> > they will simply timeout and call the receiveRequest method again.
> >
> > In terms of performance, it means that in the worst case, for a
> controller
> > request that just missed the receiveRequest call, it can be delayed for
> as
> > long as
> > the timeout parameter, which is hard coded to be 300 milliseconds. If
> there
> > is just one request handler thread, the average delay is
> > 150 milliseconds assuming the chance of a controller request arriving at
> > any particular time is the same. With N request handler threads,
> > the average delay is 150/N milliseconds, which does not seem to be a
> > problem.
> >
> > We have considered waking up of request handler threads based on which
> > queue the request handler threads are blocked,
> > and that design was turned down because of its complexity. The design can
> > be found at here
> >  > oller+request+queue+design>
> > .
> >
> > If you mean a general purpose priority queue such as the
> > java.util.PriorityQueue, we also have considered it and turned down the
> > design because
> > - The readily available class java.util.PriorityQueue is unbounded and
> > we'll need to implement a bounded version
> > - We would still like to have the FIFO semantics on both the controller
> > request queue and data request queue, which conceptually does not fit
> very
> > well
> > with a general purpose 

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-12 Thread Lucas Wang
Hi Dong,

I've updated the motivation section of the KIP by explaining the cases that
would have user impacts.
Please take a look at let me know your comments.

Thanks,
Lucas

On Mon, Jul 9, 2018 at 5:53 PM, Lucas Wang  wrote:

> Hi Dong,
>
> The simulation of disk being slow is merely for me to easily construct a
> testing scenario
> with a backlog of produce requests. In production, other than the disk
> being slow, a backlog of
> produce requests may also be caused by high produce QPS.
> In that case, we may not want to kill the broker and that's when this KIP
> can be useful, both for JBOD
> and non-JBOD setup.
>
> Going back to your previous question about each ProduceRequest covering 20
> partitions that are randomly
> distributed, let's say a LeaderAndIsr request is enqueued that tries to
> switch the current broker, say broker0, from leader to follower
> *for one of the partitions*, say *test-0*. For the sake of argument,
> let's also assume the other brokers, say broker1, have *stopped* fetching
> from
> the current broker, i.e. broker0.
> 1. If the enqueued produce requests have acks =  -1 (ALL)
>   1.1 without this KIP, the ProduceRequests ahead of LeaderAndISR will be
> put into the purgatory,
> and since they'll never be replicated to other brokers (because of
> the assumption made above), they will
> be completed either when the LeaderAndISR request is processed or
> when the timeout happens.
>   1.2 With this KIP, broker0 will immediately transition the partition
> test-0 to become a follower,
> after the current broker sees the replication of the remaining 19
> partitions, it can send a response indicating that
> it's no longer the leader for the "test-0".
>   To see the latency difference between 1.1 and 1.2, let's say there are
> 24K produce requests ahead of the LeaderAndISR, and there are 8 io threads,
>   so each io thread will process approximately 3000 produce requests. Now
> let's investigate the io thread that finally processed the LeaderAndISR.
>   For the 3000 produce requests, if we model the time when their remaining
> 19 partitions catch up as t0, t1, ...t2999, and the LeaderAndISR request is
> processed at time t3000.
>   Without this KIP, the 1st produce request would have waited an extra
> t3000 - t0 time in the purgatory, the 2nd an extra time of t3000 - t1, etc.
>   Roughly speaking, the latency difference is bigger for the earlier
> produce requests than for the later ones. For the same reason, the more
> ProduceRequests queued
>   before the LeaderAndISR, the bigger benefit we get (capped by the
> produce timeout).
> 2. If the enqueued produce requests have acks=0 or acks=1
>   There will be no latency differences in this case, but
>   2.1 without this KIP, the records of partition test-0 in the
> ProduceRequests ahead of the LeaderAndISR will be appended to the local log,
> and eventually be truncated after processing the LeaderAndISR.
> This is what's referred to as
> "some unofficial definition of data loss in terms of messages
> beyond the high watermark".
>   2.2 with this KIP, we can mitigate the effect since if the LeaderAndISR
> is immediately processed, the response to producers will have
> the NotLeaderForPartition error, causing producers to retry
>
> This explanation above is the benefit for reducing the latency of a broker
> becoming the follower,
> closely related is reducing the latency of a broker becoming the leader.
> In this case, the benefit is even more obvious, if other brokers have
> resigned leadership, and the
> current broker should take leadership. Any delay in processing the
> LeaderAndISR will be perceived
> by clients as unavailability. In extreme cases, this can cause failed
> produce requests if the retries are
> exhausted.
>
> Another two types of controller requests are UpdateMetadata and
> StopReplica, which I'll briefly discuss as follows:
> For UpdateMetadata requests, delayed processing means clients receiving
> stale metadata, e.g. with the wrong leadership info
> for certain partitions, and the effect is more retries or even fatal
> failure if the retries are exhausted.
>
> For StopReplica requests, a long queuing time may degrade the performance
> of topic deletion.
>
> Regarding your last question of the delay for DescribeLogDirsRequest, you
> are right
> that this KIP cannot help with the latency in getting the log dirs info,
> and it's only relevant
> when controller requests are involved.
>
> Regards,
> Lucas
>
>
> On Tue, Jul 3, 2018 at 5:11 PM, Dong Lin  wrote:
>
>> Hey Jun,
>>
>> Thanks much for the comments. It is good point. So the feature may be
>> useful for JBOD use-case. I have one question below.
>>
>> Hey Lucas,
>>
>> Do you think this feature is also useful for non-JBOD setup or it is only
>> useful for the JBOD setup? It may be useful to understand this.
>>
>> When the broker is setup using JBOD, in order to move leaders on the
>> failed
>> disk to other 

Build failed in Jenkins: kafka-trunk-jdk8 #2799

2018-07-12 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Additional detail in description for zookeeper.connect (#5358)

--
[...truncated 2.48 MB...]
org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveNullSensor STARTED


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Dong Lin
Hey Jason,

It is a great summary. The solution sounds good. I might have minor
comments regarding the method name. But we can discuss that minor points
later after we reach consensus on the high level API.

Thanks,
Dong


On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
wrote:

> Hey Anna and Dong,
>
> Thanks a lot for the great discussion. I've been hanging back a bit because
> honestly the best option hasn't seemed clear. I agree with Anna's general
> observation that there is a distinction between the position of the
> consumer and its fetch state up to that position. If you think about it, a
> committed offset actually represents both of these. The metadata is used to
> initialize the state of the consumer application and the offset initializes
> the position. Additionally, we are extending the offset commit in this KIP
> to also include the last epoch fetched by the consumer, which is used to
> initialize the internal fetch state. Of course if you do an arbitrary
> `seek` and immediately commit offsets, then there won't be a last epoch to
> commit. This seems intuitive since there is no fetch state in this case. We
> only commit fetch state when we have it.
>
> So if we think about a committed offset as initializing both the consumer's
> position and its fetch state, then the gap in the API is evidently that we
> don't have a way to initialize the consumer to a committed offset. We do it
> implicitly of course for offsets stored in Kafka, but since external
> storage is a use case we support, then we should have an explicit API as
> well. Perhaps something like this:
>
> seekToCommitted(TopicPartition, OffsetAndMetadata)
>
> In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> include the leader epoch, so I think this would have the same effect as
> Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given
> the current API? Furthermore, if we find a need for additional metadata in
> the offset commit API in the future, then we will just need to modify the
> `OffsetAndMetadata` object and we will not need a new `seek` API.
>
> With this approach, I think then we can leave the `position` API as it is.
> The position of the consumer is still just the next expected fetch offset.
> If a user needs to record additional state based on previous fetch
> progress, then they would use the result of the previous fetch to obtain
> it. This makes the dependence on fetch progress explicit. I think we could
> make this a little more convenience with a helper in the `ConsumerRecords`
> object, but I think that's more of a nice-to-have.
>
> Thoughts?
>
> By the way, I have been iterating a little bit on the replica side of this
> KIP. My initial proposal in fact did not have strong enough fencing to
> protect all of the edge cases. I believe the current proposal fixes the
> problems, but I am still verifying the model.
>
> Thanks,
> Jason
>
>
> On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:
>
> > Hey Anna,
> >
> > Thanks much for the explanation. Approach 1 also sounds good to me. I
> think
> > findOffsets() is useful for users who don't use automatic offset reset
> > policy.
> >
> > Just one more question. Since users who store offsets externally need to
> > provide leaderEpoch to findOffsets(...), do we need an extra API for user
> > to get both offset and leaderEpoch, e.g. recordPosition()?
> >
> > Thanks,
> > Dong
> >
> > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner 
> wrote:
> >
> > > Hi Dong,
> > >
> > >
> > > What I called “not covering all use cases” is what you call best-effort
> > > (not guaranteeing some corner cases). I think we are on the same page
> > here.
> > >
> > >
> > > I wanted to be clear in the API whether the consumer seeks to a
> position
> > > (offset) or to a record (offset, leader epoch). The only use-case of
> > > seeking to a record is seeking to a committed offset for a user who
> > stores
> > > committed offsets externally. (Unless users find some other reason to
> > seek
> > > to a record.) I thought it was possible to provide this functionality
> > with
> > > findOffset(offset, leader epoch) followed by a seek(offset). However,
> you
> > > are right that this will not handle the race condition where
> > non-divergent
> > > offset found by findOffset() could change again before the consumer
> does
> > > the first fetch.
> > >
> > >
> > > Regarding position() — if we add position that returns (offset, leader
> > > epoch), this is specifically a position after a record that was
> actually
> > > consumed or position of a committed record. In which case, I still
> think
> > > it’s cleaner to get a record position of consumed message from a new
> > helper
> > > method in ConsumerRecords() or from committed offsets.
> > >
> > >
> > > I think all the use-cases could be then covered with:
> > >
> > > (Approach 1)
> > >
> > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > > consumer state;
> > >
> > > 

Re: KIP-327: Add describe all topics API to AdminClient

2018-07-12 Thread Jason Gustafson
Hey Manikumar,

As Kafka begins to scale to larger and larger numbers of topics/partitions,
I'm a little concerned about the scalability of APIs such as this. The API
looks benign, but imagine you have have a few million partitions. We
already expose similar APIs in the producer and consumer, so probably not
much additional harm to expose it in the AdminClient, but it would be nice
to put a little thought into some longer term options. We should be giving
users an efficient way to select a smaller set of the topics they are
interested in. We have always discussed adding some filtering support to
the Metadata API. Perhaps now is a good time to reconsider this? We now
have a convention for wildcard ACLs, so perhaps we can do something
similar. Full regex support might be ideal given the consumer's
subscription API, but that is more challenging. What do you think?

Thanks,
Jason

On Thu, Jul 12, 2018 at 2:35 PM, Harsha  wrote:

> Very useful. LGTM.
>
> Thanks,
> Harsha
>
> On Thu, Jul 12, 2018, at 9:56 AM, Manikumar wrote:
> > Hi all,
> >
> > I have created a KIP to add describe all topics API to AdminClient .
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 327%3A+Add+describe+all+topics+API+to+AdminClient
> >
> > Please take a look.
> >
> > Thanks,
>


[jira] [Resolved] (KAFKA-6488) Prevent log corruption in case of OOM

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6488.
-
Resolution: Won't Do

> Prevent log corruption in case of OOM
> -
>
> Key: KAFKA-6488
> URL: https://issues.apache.org/jira/browse/KAFKA-6488
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently we will append the message to the log before updating the LEO. 
> However, if there is OOM in between these two steps, KafkaRequestHandler 
> thread can append a message to the log without updating the LEO. The next 
> message may be appended with the same offset as the first message. This can 
> prevent broker from being started because two messages have the same offset 
> in the log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3089) VerifiableProducer should do a clean shutdown in stop_node()

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-3089.
-
Resolution: Fixed

> VerifiableProducer should do a clean shutdown in stop_node()
> 
>
> Key: KAFKA-3089
> URL: https://issues.apache.org/jira/browse/KAFKA-3089
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> VerifiableProducer is closed by SIGKILL when stop_node() is called. For this 
> reason, when stop_producer_and_consumer() is invoked in 
> ProduceConsumeValidateTest, VerifiableProducer is killed immediately without 
> allowing it to wait for acknowledgement. The reported number of messages 
> produced by VerifiableProducer will thus be much smaller than the reported 
> number of messages consumed by consumer, causing confusion to developers.
> For almost all other services, such as VerifiableConsumer and 
> ConsoleConsumer, we send SIGINT when stop_node() is called. It is not clear 
> why VerifiableProducer is different from them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4820) ConsumerNetworkClient.send() should not require global lock

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-4820.
-
Resolution: Fixed

> ConsumerNetworkClient.send() should not require global lock
> ---
>
> Key: KAFKA-4820
> URL: https://issues.apache.org/jira/browse/KAFKA-4820
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
> `ConumserNetworkClient` in order to enqueue requests. If another thread has 
> called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
> while blocked on `nioSelector.select(ms)`. This causes problem because the 
> user thread which calls `ConsumerNetworkClient.send()` will also block 
> waiting for that `nioSelector.select(ms)` to finish.
> One way to address this problem is to use `synchronized (unsent)` to protect 
> access to `ConsumeNetworkClient.unsent` instead of protecting it using a 
> global lock. So that user thread should be able to enqueue requests 
> immediately while another thread is blocked on `nioSelector.select(ms)`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3145) CPU Usage Spike to 100% when network connection is to error port

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-3145.
-
Resolution: Cannot Reproduce

> CPU Usage Spike to 100% when network connection is to error port
> 
>
> Key: KAFKA-3145
> URL: https://issues.apache.org/jira/browse/KAFKA-3145
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.1, 0.9.0.0
>Reporter: zhiwei
>Assignee: Dong Lin
>Priority: Major
>
> CPU spike to 100% when network connection is to error port.
> It seems network IO thread are very busy logging following error message. 
> [2016-01-25 14:09:12,476] ERROR Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: Invalid request (size = -1241382912)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:68)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>   at java.lang.Thread.run(Thread.java:745)
> [2016-01-25 14:09:12,479] ERROR Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.network.NetworkReceive.complete(NetworkReceive.java:48)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:249)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>   at java.lang.Thread.run(Thread.java:745)
> [2016-01-25 14:09:12,480] ERROR Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.network.NetworkReceive.complete(NetworkReceive.java:48)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:249)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>   at java.lang.Thread.run(Thread.java:745)
> [2016-01-25 14:09:12,480] ERROR Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.network.NetworkReceive.complete(NetworkReceive.java:48)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:249)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>   at java.lang.Thread.run(Thread.java:745)
> [2016-01-25 14:09:12,481] ERROR Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.network.NetworkReceive.complete(NetworkReceive.java:48)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:249)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>   at java.lang.Thread.run(Thread.java:745)
> Thanks,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6617) Improve controller performance by batching reassignment znode write operation

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6617.
-
Resolution: Fixed

> Improve controller performance by batching reassignment znode write operation
> -
>
> Key: KAFKA-6617
> URL: https://issues.apache.org/jira/browse/KAFKA-6617
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5812) Represent logDir with case class where absolute path is required

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5812.
-
Resolution: Won't Do

> Represent logDir with case class where absolute path is required
> 
>
> Key: KAFKA-5812
> URL: https://issues.apache.org/jira/browse/KAFKA-5812
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6618) Prevent two controllers from updating znodes concurrently

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6618.
-
Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-6082

> Prevent two controllers from updating znodes concurrently
> -
>
> Key: KAFKA-6618
> URL: https://issues.apache.org/jira/browse/KAFKA-6618
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Kafka controller may fail to function properly (even after repeated 
> controller movement) due to the following sequence of events:
> - User requests topic deletion
> - Controller A deletes the partition znode
> - Controller B becomes controller and reads the topic znode
> - Controller A deletes the topic znode and remove the topic from the topic 
> deletion znode
> - Controller B reads the partition znode and topic deletion znode
> - According to controller B's context, the topic znode exists, the topic is 
> not listed for deletion, and some partition is not found for the given topic. 
> Then controller B will create topic znode with empty data (i.e. partition 
> assignment) and create the partition znodes.
> - All controller after controller B will fail because there is not data in 
> the topic znode.
> The long term solution is to have a way to prevent old controller from 
> writing to zookeeper if it is not the active controller. One idea is to use 
> the zookeeper multi API (See 
> [https://zookeeper.apache.org/doc/r3.4.3/api/org/apache/zookeeper/ZooKeeper.html#multi(java.lang.Iterable))]
>  such that controller only writes to zookeeper if the zk version of the 
> controller znode has not been changed.
> The short term solution is to let controller reads the topic deletion znode 
> first. If the topic is still listed in the topic deletion znode, then the new 
> controller will properly handle partition states of this topic without 
> creating partition znodes for this topic. And if the topic is not listed in 
> the topic deletion znode, then both the topic znode and the partition znodes 
> of this topic should have been deleted by the time the new controller tries 
> to read them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6636) ReplicaFetcherThread should not die if hw < 0

2018-07-12 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6636.
-
Resolution: Fixed

This is no longer an issue after KAFKA-3978; Ensure high watermark is always 
positive.

> ReplicaFetcherThread should not die if hw < 0
> -
>
> Key: KAFKA-6636
> URL: https://issues.apache.org/jira/browse/KAFKA-6636
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> ReplicaFetcherThread can die in the following scenario:
>  
> 1) Partition P1 has replica set size 1. Broker A is the leader. The segment 
> is empty and log start offset is 100
> 2) User executes partition reassignment to change replica set from \{A} to 
> \{B, C}
> 3) Broker B starts ReplicaFetcherThread, which triggers 
> handleOffsetOutOfRange(), truncates the log fully and start at offset 100. At 
> this moment its high watermark is still 0 (or -1). Same for broker C.
> 4) Broker B sends FetchRequest to A at offset 100, broker A immediately adds 
> broker B to ISR set, and controller moves leadership to broker B.
> 5) Broker B handles LeaderAndIsrRequest to become leader. It calls 
> `leaderReplica.convertHWToLocalOffsetMetadata()` to initialize its HW. Since 
> its HW was smaller than logStartOffset=100, now its HW will be overridden to 
> LogOffsetMetadata.UnknownOffsetMetadata, i.e. -1.
> 6) Broker C handles LeaderAndIsrRequest to fetch from broker B. Broker C 
> updates its HW to the FetchRequest's HW, i.e. -1. Then broker C calls 
> replica.maybeIncrementLogStartOffset(leaderLogStartOffset) where 
> leaderLogStartOffset=100. This cause exception because leaderLogStartOffset > 
> HW. This is an unhandled exception and thus the ReplicaFetcherThread will exit



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] 1.1.1 RC3

2018-07-12 Thread Harsha
+1.
1. Ran unit tests
2. Ran 3 node cluster to run few tests.

Thanks,
Harsha

On Thu, Jul 12, 2018, at 7:29 AM, Manikumar wrote:
> +1 (non-binding)  Ran tests,  Verified quick start,  producer/consumer perf
> tests
> 
> 
> 
> On Thu, Jul 12, 2018 at 11:06 AM Brett Rann 
> wrote:
> 
> > +1 (non binding)
> > rolling upgrade of shared staging multitenacy (200+ consumer groups)
> > cluster from 1.1.0 to 1.1.1-rc3 using the kafka_2.11-1.1.1.tgz artifact.
> > cluster looks healthy after upgrade. Lack of burrow lag suggests consumers
> > are still happy, and incoming messages remains the same.
> >
> > On Mon, Jul 9, 2018 at 8:36 AM Dong Lin  wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the fourth candidate for release of Apache Kafka 1.1.1.
> > >
> > > Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that was first
> > > released with 1.1.0 about 3 months ago. We have fixed about 25 issues
> > since
> > > that release. A few of the more significant fixes include:
> > >
> > > KAFKA-6925  > > > - Fix memory
> > > leak in StreamsMetricsThreadImpl
> > > KAFKA-6937  > > > - In-sync
> > > replica delayed during fetch if replica throttle is exceeded
> > > KAFKA-6917  > > > - Process txn
> > > completion asynchronously to avoid deadlock
> > > KAFKA-6893  > > > - Create
> > > processors before starting acceptor to avoid ArithmeticException
> > > KAFKA-6870  > > > -
> > > Fix ConcurrentModificationException in SampledStat
> > > KAFKA-6878  > > > - Fix
> > > NullPointerException when querying global state store
> > > KAFKA-6879  > > > - Invoke
> > > session init callbacks outside lock to avoid Controller deadlock
> > > KAFKA-6857  > > > - Prevent
> > > follower from truncating to the wrong offset if undefined leader epoch is
> > > requested
> > > KAFKA-6854  > > > - Log cleaner
> > > fails with transaction markers that are deleted during clean
> > > KAFKA-6747  > > > - Check
> > > whether there is in-flight transaction before aborting transaction
> > > KAFKA-6748  > > > - Double
> > > check before scheduling a new task after the punctuate call
> > > KAFKA-6739  > > > -
> > > Fix IllegalArgumentException when down-converting from V2 to V0/V1
> > > KAFKA-6728  > > > -
> > > Fix NullPointerException when instantiating the HeaderConverter
> > >
> > > Kafka 1.1.1 release plan:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
> > > 
> > >
> > > Release notes for the 1.1.1 release:
> > > http://home.apache.org/~lindong/kafka-1.1.1-rc3/RELEASE_NOTES.html
> > > 
> > >
> > > *** Please download, test and vote by Thursday, July 12, 12pm PT ***
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS
> > > 
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > http://home.apache.org/~lindong/kafka-1.1.1-rc3/
> > > 
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/
> > > 
> > >
> > > * Javadoc:
> > > http://home.apache.org/~lindong/kafka-1.1.1-rc3/javadoc/
> > > 
> > >
> > > * Tag to be voted upon (off 1.1 branch) is the 1.1.1-rc3 tag:
> > > https://github.com/apache/kafka/tree/1.1.1-rc3
> > > 
> > >
> > > * Documentation:
> > > 

Re: KIP-327: Add describe all topics API to AdminClient

2018-07-12 Thread Harsha
Very useful. LGTM.

Thanks,
Harsha

On Thu, Jul 12, 2018, at 9:56 AM, Manikumar wrote:
> Hi all,
> 
> I have created a KIP to add describe all topics API to AdminClient .
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-327%3A+Add+describe+all+topics+API+to+AdminClient
> 
> Please take a look.
> 
> Thanks,


Re: [VOTE] 2.0.0 RC2

2018-07-12 Thread Harsha
+1
1. Ran unit tests
2. Tested few use cases through 3-node cluster.

Thanks,
Harsha

On Thu, Jul 12, 2018, at 9:33 AM, Mickael Maison wrote:
> +1 non-binding
> Built from source, ran tests, ran quickstart and check signatures
> 
> Thanks!
> 
> 
> On Wed, Jul 11, 2018 at 10:48 PM, Jakub Scholz  wrote:
> > +1 (non-binbding) ... I built the RC2 from source, run tests and used it
> > with several of my applications without any problems.
> >
> > Thanks & Regards
> > Jakub
> >
> > On Tue, Jul 10, 2018 at 7:17 PM Rajini Sivaram 
> > wrote:
> >
> >> Hello Kafka users, developers and client-developers,
> >>
> >>
> >> This is the third candidate for release of Apache Kafka 2.0.0.
> >>
> >>
> >> This is a major version release of Apache Kafka. It includes 40 new  KIPs
> >> and
> >>
> >> several critical bug fixes. Please see the 2.0.0 release plan for more
> >> details:
> >>
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820
> >>
> >>
> >> A few notable highlights:
> >>
> >>- Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for CreateTopics
> >>(KIP-277)
> >>- SASL/OAUTHBEARER implementation (KIP-255)
> >>- Improved quota communication and customization of quotas (KIP-219,
> >>KIP-257)
> >>- Efficient memory usage for down conversion (KIP-283)
> >>- Fix log divergence between leader and follower during fast leader
> >>failover (KIP-279)
> >>- Drop support for Java 7 and remove deprecated code including old scala
> >>clients
> >>- Connect REST extension plugin, support for externalizing secrets and
> >>improved error handling (KIP-285, KIP-297, KIP-298 etc.)
> >>- Scala API for Kafka Streams and other Streams API improvements
> >>(KIP-270, KIP-150, KIP-245, KIP-251 etc.)
> >>
> >>
> >> Release notes for the 2.0.0 release:
> >>
> >> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/RELEASE_NOTES.html
> >>
> >>
> >> *** Please download, test and vote by Friday, July 13, 4pm PT
> >>
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >>
> >> http://kafka.apache.org/KEYS
> >>
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >>
> >> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/
> >>
> >>
> >> * Maven artifacts to be voted upon:
> >>
> >> https://repository.apache.org/content/groups/staging/
> >>
> >>
> >> * Javadoc:
> >>
> >> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/javadoc/
> >>
> >>
> >> * Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:
> >>
> >> https://github.com/apache/kafka/tree/2.0.0-rc2
> >>
> >>
> >>
> >> * Documentation:
> >>
> >> http://kafka.apache.org/20/documentation.html
> >>
> >>
> >> * Protocol:
> >>
> >> http://kafka.apache.org/20/protocol.html
> >>
> >>
> >> * Successful Jenkins builds for the 2.0 branch:
> >>
> >> Unit/integration tests: https://builds.apache.org/job/kafka-2.0-jdk8/72/
> >>
> >> System tests:
> >> https://jenkins.confluent.io/job/system-test-kafka/job/2.0/27/
> >>
> >>
> >> /**
> >>
> >>
> >> Thanks,
> >>
> >>
> >> Rajini
> >>


[jira] [Resolved] (KAFKA-6746) Allow ZK Znode configurable for Kafka broker

2018-07-12 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6746.
--
Resolution: Fixed

Fixed via https://github.com/apache/kafka/pull/5358

> Allow ZK Znode configurable for Kafka broker 
> -
>
> Key: KAFKA-6746
> URL: https://issues.apache.org/jira/browse/KAFKA-6746
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Biju Nair
>Priority: Major
>
> By allowing users to specify the {{Znode}} to be used along with the {{ZK 
> Quorum}}, users will be able to reuse a {{ZK}} cluster for many {{Kafka}} 
> clusters. This will help in reducing the {{ZK}} cluster footprint especially 
> in non production environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Jason Gustafson
Hey Anna and Dong,

Thanks a lot for the great discussion. I've been hanging back a bit because
honestly the best option hasn't seemed clear. I agree with Anna's general
observation that there is a distinction between the position of the
consumer and its fetch state up to that position. If you think about it, a
committed offset actually represents both of these. The metadata is used to
initialize the state of the consumer application and the offset initializes
the position. Additionally, we are extending the offset commit in this KIP
to also include the last epoch fetched by the consumer, which is used to
initialize the internal fetch state. Of course if you do an arbitrary
`seek` and immediately commit offsets, then there won't be a last epoch to
commit. This seems intuitive since there is no fetch state in this case. We
only commit fetch state when we have it.

So if we think about a committed offset as initializing both the consumer's
position and its fetch state, then the gap in the API is evidently that we
don't have a way to initialize the consumer to a committed offset. We do it
implicitly of course for offsets stored in Kafka, but since external
storage is a use case we support, then we should have an explicit API as
well. Perhaps something like this:

seekToCommitted(TopicPartition, OffsetAndMetadata)

In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
include the leader epoch, so I think this would have the same effect as
Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given
the current API? Furthermore, if we find a need for additional metadata in
the offset commit API in the future, then we will just need to modify the
`OffsetAndMetadata` object and we will not need a new `seek` API.

With this approach, I think then we can leave the `position` API as it is.
The position of the consumer is still just the next expected fetch offset.
If a user needs to record additional state based on previous fetch
progress, then they would use the result of the previous fetch to obtain
it. This makes the dependence on fetch progress explicit. I think we could
make this a little more convenience with a helper in the `ConsumerRecords`
object, but I think that's more of a nice-to-have.

Thoughts?

By the way, I have been iterating a little bit on the replica side of this
KIP. My initial proposal in fact did not have strong enough fencing to
protect all of the edge cases. I believe the current proposal fixes the
problems, but I am still verifying the model.

Thanks,
Jason


On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:

> Hey Anna,
>
> Thanks much for the explanation. Approach 1 also sounds good to me. I think
> findOffsets() is useful for users who don't use automatic offset reset
> policy.
>
> Just one more question. Since users who store offsets externally need to
> provide leaderEpoch to findOffsets(...), do we need an extra API for user
> to get both offset and leaderEpoch, e.g. recordPosition()?
>
> Thanks,
> Dong
>
> On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner  wrote:
>
> > Hi Dong,
> >
> >
> > What I called “not covering all use cases” is what you call best-effort
> > (not guaranteeing some corner cases). I think we are on the same page
> here.
> >
> >
> > I wanted to be clear in the API whether the consumer seeks to a position
> > (offset) or to a record (offset, leader epoch). The only use-case of
> > seeking to a record is seeking to a committed offset for a user who
> stores
> > committed offsets externally. (Unless users find some other reason to
> seek
> > to a record.) I thought it was possible to provide this functionality
> with
> > findOffset(offset, leader epoch) followed by a seek(offset). However, you
> > are right that this will not handle the race condition where
> non-divergent
> > offset found by findOffset() could change again before the consumer does
> > the first fetch.
> >
> >
> > Regarding position() — if we add position that returns (offset, leader
> > epoch), this is specifically a position after a record that was actually
> > consumed or position of a committed record. In which case, I still think
> > it’s cleaner to get a record position of consumed message from a new
> helper
> > method in ConsumerRecords() or from committed offsets.
> >
> >
> > I think all the use-cases could be then covered with:
> >
> > (Approach 1)
> >
> > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > consumer state;
> >
> > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
> >
> >
> > If we agree that the race condition is also a corner case, then I think
> we
> > can cover use-cases with:
> >
> > (Approach 2)
> >
> > findOffsets(offset, leaderEpoch) returns offset — we still want leader
> > epoch as a parameter for the users who store their committed offsets
> > externally.
> >
> >
> > I am actually now leaning more to approach 1, since it is more explicit,
> > and maybe there are more use cases for it.
> >
> >
> > 

[jira] [Created] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-12 Thread Christian Henry (JIRA)
Christian Henry created KAFKA-7158:
--

 Summary: Duplicates when searching kafka stream state store with 
caching
 Key: KAFKA-7158
 URL: https://issues.apache.org/jira/browse/KAFKA-7158
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Christian Henry


See mailing list email with same name for initial discussion, reposting my 
initial email here for convenience:
{noformat}
We have a kafka stream application, and one of our transform steps keeps a 
state store to filter out messages with a previously seen GUID. That is, our 
transform looks like:
public KeyValue transform(byte[] key, String guid) {
try (WindowStoreIterator iterator = 
duplicateStore.fetch(correlationId, start, now)) {
if (iterator.hasNext()) {
return null;
} else {
duplicateStore.put(correlationId, some metadata);
return new KeyValue<>(key, message);
}
}
}
where the duplicateStore is a persistent windowed store with caching enabled. 

I was debugging some tests and found that sometimes when calling all() or 
fetchAll() on the duplicate store and stepping through the iterator, it would 
return the same guid more than once, even if it was only inserted into the 
store once. More specifically, if I had the following guids sent to the stream: 
[1, 2, ... 9] (for 9 values total), sometimes it would return 10 
values, with one (or more) of the values being returned twice by the iterator. 
However, this would not show up with a fetch(guid) on that specific guid. For 
instance, if 1 was being returned twice by fetchAll(), calling 
duplicateStore.fetch("1", start, end) will still return an iterator with 
size of 1. 

I dug into this a bit more by setting a breakpoint in 
SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
the two input values as I looped through the iterator using 
"while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
duplicate value was 6, and saw the following behavior (trimming off the 
segment values from the byte input):
-- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
-- next() returns 6
and 
-- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
-- next() returns 6
Besides those, the input values are the same and the output is as expected. 
Additionally, a coworker noted that the number of duplicates always matches the 
number of times Long.compare(cacheSegmentId, storeSegmentId) returns a non-zero 
value, indicating that duplicates are likely arising due to the segment 
comparison. {noformat}
 

Basically, what we're seeing is that if you have a persistent store with 
caching enabled, you will sometimes get duplicate keys when querying for all 
keys (using all() or fetchAll()) even though fetch(key) will only return one 
result. That is, if you had a fresh store with nothing in it and did something 
like:
{code:java}
IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
{code}
then calling
{code:java}
store.fetchAll(start, end)
{code}
would return an iterator with MORE than 100 items, whereas if you explicitly did
{code:java}
store.fetch("key" + i)
{code}
for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


KIP-327: Add describe all topics API to AdminClient

2018-07-12 Thread Manikumar
Hi all,

I have created a KIP to add describe all topics API to AdminClient .

https://cwiki.apache.org/confluence/display/KAFKA/KIP-327%3A+Add+describe+all+topics+API+to+AdminClient

Please take a look.

Thanks,


Re: [VOTE] 2.0.0 RC2

2018-07-12 Thread Mickael Maison
+1 non-binding
Built from source, ran tests, ran quickstart and check signatures

Thanks!


On Wed, Jul 11, 2018 at 10:48 PM, Jakub Scholz  wrote:
> +1 (non-binbding) ... I built the RC2 from source, run tests and used it
> with several of my applications without any problems.
>
> Thanks & Regards
> Jakub
>
> On Tue, Jul 10, 2018 at 7:17 PM Rajini Sivaram 
> wrote:
>
>> Hello Kafka users, developers and client-developers,
>>
>>
>> This is the third candidate for release of Apache Kafka 2.0.0.
>>
>>
>> This is a major version release of Apache Kafka. It includes 40 new  KIPs
>> and
>>
>> several critical bug fixes. Please see the 2.0.0 release plan for more
>> details:
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820
>>
>>
>> A few notable highlights:
>>
>>- Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for CreateTopics
>>(KIP-277)
>>- SASL/OAUTHBEARER implementation (KIP-255)
>>- Improved quota communication and customization of quotas (KIP-219,
>>KIP-257)
>>- Efficient memory usage for down conversion (KIP-283)
>>- Fix log divergence between leader and follower during fast leader
>>failover (KIP-279)
>>- Drop support for Java 7 and remove deprecated code including old scala
>>clients
>>- Connect REST extension plugin, support for externalizing secrets and
>>improved error handling (KIP-285, KIP-297, KIP-298 etc.)
>>- Scala API for Kafka Streams and other Streams API improvements
>>(KIP-270, KIP-150, KIP-245, KIP-251 etc.)
>>
>>
>> Release notes for the 2.0.0 release:
>>
>> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/RELEASE_NOTES.html
>>
>>
>> *** Please download, test and vote by Friday, July 13, 4pm PT
>>
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>>
>> http://kafka.apache.org/KEYS
>>
>>
>> * Release artifacts to be voted upon (source and binary):
>>
>> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/
>>
>>
>> * Maven artifacts to be voted upon:
>>
>> https://repository.apache.org/content/groups/staging/
>>
>>
>> * Javadoc:
>>
>> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/javadoc/
>>
>>
>> * Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:
>>
>> https://github.com/apache/kafka/tree/2.0.0-rc2
>>
>>
>>
>> * Documentation:
>>
>> http://kafka.apache.org/20/documentation.html
>>
>>
>> * Protocol:
>>
>> http://kafka.apache.org/20/protocol.html
>>
>>
>> * Successful Jenkins builds for the 2.0 branch:
>>
>> Unit/integration tests: https://builds.apache.org/job/kafka-2.0-jdk8/72/
>>
>> System tests:
>> https://jenkins.confluent.io/job/system-test-kafka/job/2.0/27/
>>
>>
>> /**
>>
>>
>> Thanks,
>>
>>
>> Rajini
>>


[jira] [Created] (KAFKA-7157) Connect TimestampConverter SMT doesn't handle null values

2018-07-12 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-7157:


 Summary: Connect TimestampConverter SMT doesn't handle null values
 Key: KAFKA-7157
 URL: https://issues.apache.org/jira/browse/KAFKA-7157
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Randall Hauch


TimestampConverter SMT is not able to handle null values (in any versions), so 
it's always trying to apply the transformation to the value. Instead, it needs 
to check for null and use the default value for the new schema's field.

{noformat}
[2018-07-03 02:31:52,490] ERROR Task MySourceConnector-2 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) 
java.lang.NullPointerException 
at 
org.apache.kafka.connect.transforms.TimestampConverter$2.toRaw(TimestampConverter.java:137)
 
at 
org.apache.kafka.connect.transforms.TimestampConverter.convertTimestamp(TimestampConverter.java:440)
 
at 
org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:368)
 
at 
org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:358)
 
at 
org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:275)
 
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:435)
 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:264) 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
 
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
 
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) 
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
[2018-07-03 02:31:52,491] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerTask) 
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] 1.1.1 RC3

2018-07-12 Thread Manikumar
+1 (non-binding)  Ran tests,  Verified quick start,  producer/consumer perf
tests



On Thu, Jul 12, 2018 at 11:06 AM Brett Rann 
wrote:

> +1 (non binding)
> rolling upgrade of shared staging multitenacy (200+ consumer groups)
> cluster from 1.1.0 to 1.1.1-rc3 using the kafka_2.11-1.1.1.tgz artifact.
> cluster looks healthy after upgrade. Lack of burrow lag suggests consumers
> are still happy, and incoming messages remains the same.
>
> On Mon, Jul 9, 2018 at 8:36 AM Dong Lin  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the fourth candidate for release of Apache Kafka 1.1.1.
> >
> > Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that was first
> > released with 1.1.0 about 3 months ago. We have fixed about 25 issues
> since
> > that release. A few of the more significant fixes include:
> >
> > KAFKA-6925  > > - Fix memory
> > leak in StreamsMetricsThreadImpl
> > KAFKA-6937  > > - In-sync
> > replica delayed during fetch if replica throttle is exceeded
> > KAFKA-6917  > > - Process txn
> > completion asynchronously to avoid deadlock
> > KAFKA-6893  > > - Create
> > processors before starting acceptor to avoid ArithmeticException
> > KAFKA-6870  > > -
> > Fix ConcurrentModificationException in SampledStat
> > KAFKA-6878  > > - Fix
> > NullPointerException when querying global state store
> > KAFKA-6879  > > - Invoke
> > session init callbacks outside lock to avoid Controller deadlock
> > KAFKA-6857  > > - Prevent
> > follower from truncating to the wrong offset if undefined leader epoch is
> > requested
> > KAFKA-6854  > > - Log cleaner
> > fails with transaction markers that are deleted during clean
> > KAFKA-6747  > > - Check
> > whether there is in-flight transaction before aborting transaction
> > KAFKA-6748  > > - Double
> > check before scheduling a new task after the punctuate call
> > KAFKA-6739  > > -
> > Fix IllegalArgumentException when down-converting from V2 to V0/V1
> > KAFKA-6728  > > -
> > Fix NullPointerException when instantiating the HeaderConverter
> >
> > Kafka 1.1.1 release plan:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
> > 
> >
> > Release notes for the 1.1.1 release:
> > http://home.apache.org/~lindong/kafka-1.1.1-rc3/RELEASE_NOTES.html
> > 
> >
> > *** Please download, test and vote by Thursday, July 12, 12pm PT ***
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> > 
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~lindong/kafka-1.1.1-rc3/
> > 
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> > 
> >
> > * Javadoc:
> > http://home.apache.org/~lindong/kafka-1.1.1-rc3/javadoc/
> > 
> >
> > * Tag to be voted upon (off 1.1 branch) is the 1.1.1-rc3 tag:
> > https://github.com/apache/kafka/tree/1.1.1-rc3
> > 
> >
> > * Documentation:
> > http://kafka.apache.org/11/documentation.html
> > 
> >
> > * Protocol:
> > http://kafka.apache.org/11/protocol.html
> > 
> >
> > * Successful Jenkins builds for the 1.1 branch:
> > Unit/integration tests: *
> https://builds.apache.org/job/kafka-1.1-jdk7/162
> > 

Build failed in Jenkins: kafka-trunk-jdk8 #2798

2018-07-12 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Print exception stack traces in ConsumerGroupCommand. (#5286)

--
[...truncated 552.94 KB...]
kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath STARTED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath PASSED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods STARTED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges STARTED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges PASSED

kafka.zk.KafkaZkClientTest > testControllerEpochMethods STARTED

kafka.zk.KafkaZkClientTest > testControllerEpochMethods PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates STARTED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates PASSED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods STARTED

kafka.zk.KafkaZkClientTest > testDelegationTokenMethods PASSED

kafka.zk.LiteralAclStoreTest > shouldHaveCorrectPaths STARTED

kafka.zk.LiteralAclStoreTest > shouldHaveCorrectPaths PASSED

kafka.zk.LiteralAclStoreTest > shouldRoundTripChangeNode STARTED

kafka.zk.LiteralAclStoreTest > shouldRoundTripChangeNode PASSED

kafka.zk.LiteralAclStoreTest > shouldThrowFromEncodeOnNoneLiteral STARTED

kafka.zk.LiteralAclStoreTest > shouldThrowFromEncodeOnNoneLiteral PASSED

kafka.zk.LiteralAclStoreTest > shouldWriteChangesToTheWritePath STARTED

kafka.zk.LiteralAclStoreTest > shouldWriteChangesToTheWritePath PASSED

kafka.zk.LiteralAclStoreTest > shouldHaveCorrectPatternType STARTED

kafka.zk.LiteralAclStoreTest > shouldHaveCorrectPatternType PASSED

kafka.zk.LiteralAclStoreTest > shouldDecodeResourceUsingTwoPartLogic STARTED

kafka.zk.LiteralAclStoreTest > shouldDecodeResourceUsingTwoPartLogic PASSED

kafka.zk.ExtendedAclStoreTest > shouldHaveCorrectPaths STARTED

kafka.zk.ExtendedAclStoreTest > shouldHaveCorrectPaths PASSED

kafka.zk.ExtendedAclStoreTest > shouldRoundTripChangeNode STARTED

kafka.zk.ExtendedAclStoreTest > shouldRoundTripChangeNode PASSED

kafka.zk.ExtendedAclStoreTest > shouldThrowFromEncodeOnLiteral STARTED

kafka.zk.ExtendedAclStoreTest > shouldThrowFromEncodeOnLiteral PASSED

kafka.zk.ExtendedAclStoreTest > shouldThrowIfConstructedWithLiteral STARTED

kafka.zk.ExtendedAclStoreTest > shouldThrowIfConstructedWithLiteral PASSED

kafka.zk.ExtendedAclStoreTest > shouldWriteChangesToTheWritePath STARTED

kafka.zk.ExtendedAclStoreTest > shouldWriteChangesToTheWritePath PASSED

kafka.zk.ExtendedAclStoreTest > shouldHaveCorrectPatternType STARTED

kafka.zk.ExtendedAclStoreTest > shouldHaveCorrectPatternType PASSED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegmentSize 
STARTED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegmentSize 
PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeEarliestTime STARTED

kafka.server.LogOffsetTest > testGetOffsetsBeforeEarliestTime PASSED

kafka.server.LogOffsetTest > testGetOffsetsForUnknownTopic STARTED

kafka.server.LogOffsetTest > testGetOffsetsForUnknownTopic PASSED

kafka.server.LogOffsetTest > testEmptyLogsGetOffsets STARTED

kafka.server.LogOffsetTest > testEmptyLogsGetOffsets PASSED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegments STARTED
ERROR: Could not install GRADLE_4_8_1_HOME
java.lang.NullPointerException
at 

[jira] [Created] (KAFKA-7156) Deleting topics with long names can bring all brokers to unrecoverable state

2018-07-12 Thread Petr Pchelko (JIRA)
Petr Pchelko created KAFKA-7156:
---

 Summary: Deleting topics with long names can bring all brokers to 
unrecoverable state
 Key: KAFKA-7156
 URL: https://issues.apache.org/jira/browse/KAFKA-7156
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Petr Pchelko


Kafka limit for the topic name is 249 symbols, so creating a topic with a name 
248 symbol long is possible. However, when deleting the topic, Kafka tries to 
rename the data directory for the topic to add some hash and `-deleted` in the 
data directory, so that the resulting file name exceeds the 255 symbol file 
name limit in most of the Unix file systems. This provokes a  
java.nio.file.FileSystemException which in turn immediately shuts down all the 
brokers. Further attemts to restart the broker fail with the same exception. 
The only way to resurrect the cluster is to manually delete the affected topic 
from zookeeper and from the disk on all the broker machines.

 

Steps to reproduce:

(Note: delete.topic.enable=true must be set in the config)

 
{code:java}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic 
> 
>  --partitions 1 --replication-factor 1
> kafka-topics.sh --zookeeper localhost:2181 --delete --topic 
> aaa
{code}
 

 

After these 2 commands executed all the brokers where this topic is replicated 
immediately shut down with the following logs:

 
{code:java}
ERROR Error while renaming dir for 
-0
 in log dir /tmp/kafka-logs (kafka.server.LogDirFailureChannel)

java.nio.file.FileSystemException: 
/tmp/kafka-logs/-0
 -> 
/tmp/kafka-logs/-0.093fd1e1728f438ea990cbad8a514b9f-delete:
 File name too long

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:457)

at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)

at java.nio.file.Files.move(Files.java:1395)

...

Suppressed: java.nio.file.FileSystemException: 
/tmp/kafka-logs/-0
 -> 
/tmp/kafka-logs/-0.093fd1e1728f438ea990cbad8a514b9f-delete:
 File name too long

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)

at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)

at java.nio.file.Files.move(Files.java:1395)

at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)

... 23 more

[2018-07-12 13:34:45,847] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)

[2018-07-12 13:34:45,848] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions  (kafka.server.ReplicaFetcherManager)

[2018-07-12 13:34:45,849] INFO [ReplicaAlterLogDirsManager on broker 0] Removed 
fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)

[2018-07-12 13:34:45,851] INFO [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions  and stopped moving logs for partitions  because they 
are in the 

[jira] [Created] (KAFKA-7155) the property CONFIG of the ProducerConfig maybe null.

2018-07-12 Thread zhangjk (JIRA)
zhangjk created KAFKA-7155:
--

 Summary: the property CONFIG of the ProducerConfig maybe null.
 Key: KAFKA-7155
 URL: https://issues.apache.org/jira/browse/KAFKA-7155
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.1.0
 Environment: scala  2.12.6
kafka  1.1.0
log4j2  2.11.0
Reporter: zhangjk


hi:

When i use the log4j2's KafkaAppender , I found a bug.  the property CONFIG of 
the ProducerConfig.

here's my code:
{code:java}
log4j2.xml



localhost:9092


{code}
{code:java}
object KafkaTest extends App {
  val p = new Producer
}

{code}
{code:java}
class Producer {
  val props = {
val p = new Properties()

p
  }

  val producer = {
val p = new KafkaProducer[Integer, String](props)
p
  }
}
{code}
The reason for this BUG is that: 

the `new Producer` is first initialized, and the `ProducerConfig` has a static 
method block, and it called the `CommonClientConfigs` class. in the 
`CommonClientConfigs`,  it has a Log property,

so, this will initialize the Log4j2's configuration, and this will call a 
startup method of the Appender, and it actually calls the KafkaAppender's 
method. and then the method create a new `KafkaProducer`, this will create a 
new `ProducerConfig` instance, but the static method block is not completed, so 
the `CONFIG` is null.  this cause the `NullPointerException`

 

my english is bad, so , hope to understand what i mean

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7154) Apache kafka getting shut down everytime it is started

2018-07-12 Thread Pushkar Kumar (JIRA)
Pushkar Kumar created KAFKA-7154:


 Summary: Apache kafka getting shut down everytime it is started
 Key: KAFKA-7154
 URL: https://issues.apache.org/jira/browse/KAFKA-7154
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Pushkar Kumar


Hi Team,

We performed below steps to setup Apache Kafka on a Linux CentOS box:-

1.Setup jdk-8u172-linux-x64

2.Setup zookeeper-3.5.4-beta and its running fine.

3.Setup kafka_2.12-1.1.0(Binary file download from Apache Kafka platform)

Whenever I am trying to start the Kafka Service, it shows below behavior:-

INFO Terminating process due to signal SIGHUP (kafka.Kafka$)
 INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7153) Kafka commits transaction twice due to LeaveGroup

2018-07-12 Thread Kristian Kolding Foged-Ladefoged (JIRA)
Kristian Kolding Foged-Ladefoged created KAFKA-7153:
---

 Summary: Kafka commits transaction twice due to LeaveGroup
 Key: KAFKA-7153
 URL: https://issues.apache.org/jira/browse/KAFKA-7153
 Project: Kafka
  Issue Type: Bug
Reporter: Kristian Kolding Foged-Ladefoged


I am using transaction in kafka. I have provided my consumer container with a 
`ChainedKafkaTransactionManager` which consist of `JpaTransactionManager` and 
`KafkaTransactionManager`.

I am trying to learn how transactions are affected when a consumer is stuck and 
therefore send `LeaveGroup` and disables heartbeat thread.

I have set `max.poll.interval.ms` to 10 seconds.

I have not changed `session.timeout.ms`. It is 10 seconds per default.

I have two applications with one consumer each. Both consumers are 
transactional. Consumer A is rigged to process for 30 seconds and Consumer B 
process it within 1 second. Both consumers read from the same topic, which as 3 
partitions.

1. Send a record to Kafka
2. Consumer A receives the record.
3. Consumer A starts to process the record.
4. Consumer A processing time exceed `max.poll.interval.ms`
5. Consumer A send LeaveGroup and heartbeat stops.
6. Kafka rebalances. All partitions are now assigned to Consumer B.
7. Consumer B receives the same record and process it.
8. Consumer B commits the transaction.
9. Consumer A has now finished processing(30 seconds).
10. Consumer A commits the transaction.
11. Kafka rebalances. Partitions are reassigned to both consumers.

The transaction is processed and committed twice. Both consumers should be 
idempotent to ensure processing the same record have no consequences.

My hypothesis was that Consumer A would throw an exception due to LeaveGroup 
and stopping the heartbeat. This is however not the case. I have tested this 
with unique transaction IDs and transaction ID being identical in both 
applications - same result.

Why does Consumer A commit the transaction after LeaveGroup has been sent?

*Log for Consumer A*

 

*2018-07-12 11:09:00.054 DEBUG [kafka-transaction-microservice-example,,,] 
55757 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : 
Received: 1 records
2018-07-12 11:09:00.055 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new 
transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-12 11:09:00.056 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer 
clientId=producer-1, transactionalId=transactionId420] Transition from state 
READY to IN_TRANSACTION
2018-07-12 11:09:00.056 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka 
transaction on producer [brave.kafka.clients.TracingProducer@77b619a8]
2018-07-12 11:09:00.057 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing 
[GenericMessage [payload=[data], headers=\{kafka_offset=[50], 
kafka_consumer=brave.kafka.clients.TracingConsumer@1a57f4f9, 
kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], 
kafka_receivedPartitionId=[2], kafka_receivedTopic=[trans-topic], 
kafka_receivedTimestamp=[1531382939970], 
kafka_batchConvertedHeaders=[{X-B3-SpanId=[B@556a1239, 
X-B3-ParentSpanId=[B@11ce7b87, X-B3-Sampled=[B@18d829c3, 
X-B3-TraceId=[B@3d0df61d}]}]]
2018-07-12 11:09:00.080 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [hread | mygrp42] o.a.k.c.consumer.internals.Fetcher : [Consumer 
clientId=consumer-1, groupId=mygrp42] Fetch READ_UNCOMMITTED at offset 51 for 
partition trans-topic-2 returned fetch data (error=NONE, highWaterMark=52, 
lastStableOffset = -1, logStartOffset = 48, abortedTransactions = null, 
recordsSizeInBytes=78)
2018-07-12 11:09:01.623 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer 
clientId=consumer-1, groupId=mygrp42] Sending Heartbeat request to coordinator 
localhost:9092 (id: 2147483647 rack: null)
2018-07-12 11:09:01.724 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer 
clientId=consumer-1, groupId=mygrp42] Received successful Heartbeat response
2018-07-12 11:09:04.709 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer 
clientId=consumer-1, groupId=mygrp42] Sending Heartbeat request to coordinator 
localhost:9092 (id: 2147483647 rack: null)
2018-07-12 11:09:04.810 DEBUG [kafka-transaction-microservice-example,,,] 55757 
--- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer 
clientId=consumer-1, groupId=mygrp42]