Jenkins build is back to normal : kafka-trunk-jdk8 #2776

2018-06-27 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-7113) AdminClient.create has an infinite retry loop bug

2018-06-27 Thread 02tmqrmje9 (JIRA)
02tmqrmje9 created KAFKA-7113:
-

 Summary: AdminClient.create has an infinite retry loop bug
 Key: KAFKA-7113
 URL: https://issues.apache.org/jira/browse/KAFKA-7113
 Project: Kafka
  Issue Type: Bug
Reporter: 02tmqrmje9


When you try to instantiate an KafkaAdminClient using the factory method 
AdminClient.create it returns a client while side-effecting with a background 
thread stuck in a retry loop forever attempting to get broker metadata when 
brokers are not accessible.

Running the below code:

 
{code:java}
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080)
AdminClient.create(props)
{code}
Will result in an endless loop of the below forever:

 

 
{code:java}
Connection to node -1 could not be established. Broker may not be available
{code}
As create returns an AdminClient value straight away and forks to a background 
thread there is no way to know if the client instantiated successfully or not 
and to handle the error case.
 * Calling create ideally should not side affect as it's equivalent to a 
side-effecting constructor
 * At the least calling create should fatally fail if it can't talk to the 
brokers so the error case can be handled, i.e so a user can fail fast and 
propagate errors.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response

2018-06-27 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7104.

Resolution: Fixed

> ReplicaFetcher thread may die because of inconsistent log start offset in 
> fetch response
> 
>
> Key: KAFKA-7104
> URL: https://issues.apache.org/jira/browse/KAFKA-7104
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
> Fix For: 2.0.0, 1.0.3, 1.1.2
>
>
> What we saw:
> The follower fetches offset 116617, which it was able successfully append. 
> However, leader's log start offset in fetch request was 116753, which was 
> higher than fetched offset 116617. When replica fetcher thread tried to 
> increment log start offset to leader's log start offset, it failed with 
> OffsetOutOfRangeException: 
> [2018-06-23 00:45:37,409] ERROR  Error due to 
> (kafka.server.ReplicaFetcherThread) 
>  kafka.common.KafkaException: Error processing data for partition X-N offset 
> 116617 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 116753 of partition X-N since it is larger 
> than the high watermark 116619
>  
> In leader's log, we see that log start offset was incremented almost at the 
> same time (within one 100 ms or so). 
> [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
> to 116753
>  
> In leader's logic: ReplicaManager#ReplicaManager first calls 
> readFromLocalLog() that reads from local log and returns LogReadResult that 
> contains fetched data and leader's log start offset and HW. However, it then 
> calls ReplicaManager#updateFollowerLogReadResults() which may move leader's 
> log start offset and update leader's log start offset and HW in fetch 
> response. If deleteRecords() happens in between, it is possible that log 
> start offset may move beyond fetched offset. Or possibly, the leader moves 
> log start offset because of deleting old log segments. Basically, the issue 
> is that log start offset can move between records are read from the log and 
> LogReadResult is updated with new log start offset. As a result, fetch 
> response may contain fetched data but leader's log start offset in the 
> response could be set beyond fetched offset (and indicate the state on leader 
> that fetched data does not actually exist anymore on leader). 
> When a follower receives such fetch response, it will first append, then move 
> it's HW no further than its LEO, which maybe less than leader's log start 
> offset in fetch response, and then call 
> `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
> OffsetOutOfRangeException exception causing the fetcher thread to stop. 
> Note that this can happen if the follower is not in ISR, otherwise the leader 
> will not move its log start offsets beyond follower's HW. 
>  
> *Suggested fix:*
> 1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should 
> also bound follower's log start offset to its LEO. In this situation, the 
> follower's log start offset will be updated to LEO.
> 2) In addition to #1, we could try to make sure that leader builds fetch 
> response based on the state of the log as of time of reading data from 
> replica (but including moving leader's HW based on the follower's fetch). 
> That could be another JIRA potentially, since the fix could be more involved.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-06-27 Thread James Cheng
The “Motivation” section of the KIP says that the starting offset will be 
useful but doesn’t say why. Can you add a use-case or two to describe how it 
will be useful?

In our case (see 
https://github.com/wushujames/kafka-utilities/blob/master/ConsumerGroupLag/README.md),
 we found the starting offset useful so that we could calculate partition size 
so that we could identify empty partitions (partitions where all the data had 
expired). In particular, we needed that info so that we could calculate “lag”. 
Consider that case where we are asked to mirror an abandoned topic where 
startOffset==endOffset==100. We would have no committed offset on it, and 
the topic has no data in it, so we won’t soon get any committed offset, and so 
“lag” is kind of undefined. We used the additional startOffset to allow us to 
detect this case.

-James

Sent from my iPhone

> On Jun 26, 2018, at 11:23 AM, Vahid S Hashemian  
> wrote:
> 
> Hi everyone,
> 
> I have created a trivial KIP to improve the offset reporting of the 
> consumer group command: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-325%3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
> Looking forward to your feedback!
> 
> Thanks.
> --Vahid
> 
> 


Re: [DISCUSS] KIP-308: Support dynamic update of max.connections.per.ip/max.connections.per.ip.overrides configs

2018-06-27 Thread Manikumar
Thanks for the responses. If there are no concerns, I will start the voting
process soon.


On Tue, Jun 26, 2018 at 10:14 PM Harsha  wrote:

> This is very useful. LGTM.
>
> Thanks,
> Harsha
>
> On Mon, Jun 25th, 2018 at 10:20 AM, Dong Lin  wrote:
>
> >
> >
> >
> > Hey Manikumar,
> >
> > Thanks much for the KIP. It looks pretty good.
> >
> > Thanks,
> > Dong
> >
> > On Thu, Jun 21, 2018 at 11:38 PM, Manikumar < manikumar.re...@gmail.com
> >
> > wrote:
> >
> > > Hi all,
> > >
> > > I have created a KIP to add support for dynamic update of
> > > max.connections.per.ip/max.connections.per.ip.overrides configs
> > >
> > > *
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=85474993
> >
> > > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=85474993
> >
> > > >*
> > >
> > > Any feedback is appreciated.
> > >
> > > Thanks
> > >
> >
> >
> >
> >


Build failed in Jenkins: kafka-trunk-jdk8 #2775

2018-06-27 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-7076; Skip rebuilding producer state when using old message format

[jason] KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors

--
[...truncated 867.99 KB...]

kafka.zookeeper.ZooKeeperClientTest > testConnectionLossRequestTermination 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnectionLossRequestTermination 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testExistsNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testExistsNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testConnectionTimeout STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnectionTimeout PASSED

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler STARTED

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler PASSED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString STARTED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData STARTED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline STARTED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry STARTED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED


Jenkins build is back to normal : kafka-2.0-jdk8 #59

2018-06-27 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-330: Add retentionPeriod in SessionBytesStoreSupplier

2018-06-27 Thread Ted Yu
+1

On Wed, Jun 27, 2018 at 4:40 PM, Bill Bejeck  wrote:

> +1
>
> -Bill
>
> On Wed, Jun 27, 2018 at 7:39 PM Guozhang Wang  wrote:
>
> > Hello folks,
> >
> > I'd like to start a voting thread on KIP-330. I've intentionally skipped
> > the discuss phase since it is a pretty straight-forward public API change
> > and should actually be added since day one. The bug fix of KAFKA-7071
> > helped us to discover this overlook.
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread John Roesler
Hi Bill,

Thanks for the review!

Your question is very much applicable to the KIP and not at all an
implementation detail. Thanks for bringing it up.

I'm proposing not to change the existing caches and configurations at all
(for now).

Imagine you have a topology like this:
commit.interval.ms = 100

(ktable1 (cached)) -> (suppress emitAfter 200)

The first ktable (ktable1) will respect the commit interval and buffer
events for 100ms before logging, storing, or forwarding them (IIRC).
Therefore, the second ktable (suppress) will only see the events at a rate
of once per 100ms. It will apply its own buffering, and emit once per 200ms
This case is pretty trivial because the suppress time is a multiple of the
commit interval.

When it's not an integer multiple, you'll get behavior like in this marble
diagram:


<-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->

[ KTable caching with commit interval = 2 ]

<(k:2)-(k:4)-(k:6)->

  [ suppress with emitAfter = 3 ]

<---(k:2)(k:6)->


If this behavior isn't desired (for example, if you wanted to emit (k:3) at
time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 or
modifying the topology to disable caching. Then, the behavior is more
simply determined just by the suppress operator.

Does that seem right to you?


Regarding the changelogs, because the suppression operator hangs onto
events for a while, it will need its own changelog. The changelog
should represent the current state of the buffer at all times. So when the
suppress operator sees (k:2), for example, it will log (k:2). When it
later gets to time 3, it's time to emit (k:2) downstream. Because k is no
longer buffered, the suppress operator will log (k:null). Thus, when
recovering,
it can rebuild the buffer by reading its changelog.

What do you think about this?

Thanks,
-John



On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck  wrote:

> Hi John,  thanks for the KIP.
>
> Early on in the KIP, you mention the current approaches for controlling the
> rate of downstream records from a KTable, cache size configuration and
> commit time.
>
> Will these configuration parameters still be in effect for tables that
> don't use suppression?  For tables taking advantage of suppression, will
> these configurations have no impact?
> This last question may be to implementation specific but if the requested
> suppression time is longer than the specified commit time, will the latest
> record in the suppression buffer get stored in a changelog?
>
> Thanks,
> Bill
>
> On Wed, Jun 27, 2018 at 3:04 PM John Roesler  wrote:
>
> > Thanks for the feedback, Matthias,
> >
> > It seems like in straightforward relational processing cases, it would
> not
> > make sense to bound the lateness of KTables. In general, it seems better
> to
> > have "guard rails" in place that make it easier to write sensible
> programs
> > than insensible ones.
> >
> > But I'm still going to argue in favor of keeping it for all KTables ;)
> >
> > 1. I believe it is simpler to understand the operator if it has one
> uniform
> > definition, regardless of context. It's well defined and intuitive what
> > will happen when you use late-event suppression on a KTable, so I think
> > nothing surprising or dangerous will happen in that case. From my
> > perspective, having two sets of allowed operations is actually an
> increase
> > in cognitive complexity.
> >
> > 2. To me, it's not crazy to use the operator this way. For example, in
> lieu
> > of full-featured timestamp semantics, I can implement MVCC behavior when
> > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
> > there are other, non-obvious applications of suppressing late events on
> > KTables.
> >
> > 3. Not to get too much into implementation details in a KIP discussion,
> but
> > if we did want to make late-event suppression available only on windowed
> > KTables, we have two enforcement options:
> >   a. check when we build the topology - this would be simple to
> implement,
> > but would be a runtime check. Hopefully, people write tests for their
> > topology before deploying them, so the feedback loop isn't instantaneous,
> > but it's not too long either.
> >   b. add a new WindowedKTable type - this would be a compile time check,
> > but would also be substantial increase of both interface and code
> > complexity.
> >
> > We should definitely strive to have guard rails protecting against
> > surprising or dangerous behavior. Protecting against programs that we
> don't
> > currently predict is a lesser benefit, and I think we can put up guard
> > rails on a case-by-case basis for that. It seems like the increase in
> > cognitive (and potentially code and interface) complexity makes me think
> we
> > should skip this case.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the KIP John.
> > >
> > > One initial comments 

Re: [DISCUSS] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-06-27 Thread Gwen Shapira
It is incredibly weird, but it is also a repeating pattern. I figured we
may as well address this. I definitely explained how Kafka is different
from traditional queues and what lag means vs partition size.

To the best of my understanding, they see size as a measure of a topic
importance in terms that are easier to their business management to
understand. If I store many events in Kafka, clearly I'm making good use of
Kafka.
"My manager comes every day and asks me, how many events do we keep in
Kafka" is a quote I heard multiple times.

Those are companies who are new to the whole streams concept... I guess
they are not used to think in terms of throughput? And management seems to
learn slower than engineers?

Anyway, this is just a suggestion. I'll +1 the KIP either way. But since I
know there's demand, I figured I'll ask.

Gwen

On Wed, Jun 27, 2018 at 4:53 PM, Jason Gustafson  wrote:

> Hey Gwen,
>
> Why do users want partition size? It seems like a weird thing to be
> concerned about. Perhaps they are trying to get a sense of the lag as a
> percentage of the total size of the partition or something like that?
>
> -Jason
>
> On Tue, Jun 26, 2018 at 9:12 PM, Gwen Shapira  wrote:
>
> > Thank you!
> >
> > On Tue, Jun 26, 2018 at 8:47 PM, Vahid S Hashemian <
> > vahidhashem...@us.ibm.com> wrote:
> >
> > > Thanks for the feedback. The KIP is updated to also include a
> "partition
> > > size" column.
> > >
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From:   Ted Yu 
> > > To: dev@kafka.apache.org
> > > Date:   06/26/2018 06:21 PM
> > > Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to
> > > Show Beginning Offsets
> > >
> > >
> > >
> > > nit:
> > >
> > > bq. leaving this empty for compacted topics
> > >
> > > Some user(s) may be confused by empty partition size. How about
> emitting
> > > 'compacted' for compacted topics ?
> > >
> > > Cheers
> > >
> > > On Tue, Jun 26, 2018 at 4:42 PM, Gwen Shapira 
> wrote:
> > >
> > > > It will be. In my experience most topics aren't compacted, so it will
> > > still
> > > > be valuable. If not difficult, leaving this empty for compacted
> topics
> > > to
> > > > avoid confusion will also be nice.
> > > >
> > > > On Tue, Jun 26, 2018 at 4:29 PM, Vahid S Hashemian <
> > > > vahidhashem...@us.ibm.com> wrote:
> > > >
> > > > > Hi Gwen,
> > > > >
> > > > > Thanks for the feedback.
> > > > > Regarding the partition size, couldn't "end offset - start offset"
> be
> > > > > misleading for compacted topics?
> > > > >
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > From:   Gwen Shapira 
> > > > > To: dev 
> > > > > Date:   06/26/2018 02:36 PM
> > > > > Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group
> Command
> > > to
> > > > > Show Beginning Offsets
> > > > >
> > > > >
> > > > >
> > > > > Small suggestion: you can also add a "partition size" column -
> > > difference
> > > > > between log-end and log-start. We've had users ask for this.
> > > > >
> > > > > On Tue, Jun 26, 2018 at 2:34 PM, Gwen Shapira 
> > > wrote:
> > > > >
> > > > > > This will be useful! Thank you :)
> > > > > >
> > > > > > On Tue, Jun 26, 2018 at 11:23 AM, Vahid S Hashemian <
> > > > > > vahidhashem...@us.ibm.com> wrote:
> > > > > >
> > > > > >> Hi everyone,
> > > > > >>
> > > > > >> I have created a trivial KIP to improve the offset reporting of
> > the
> > > > > >> consumer group command:
> > > > > >>
> > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-325%
> > >
> > > > >
> > > > > >> 3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
> > > > > >> Looking forward to your feedback!
> > > > > >>
> > > > > >> Thanks.
> > > > > >> --Vahid
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > *Gwen Shapira*
> > > > > > Product Manager | Confluent
> > > > > > 650.450.2760 | @gwenshap
> > > > > > Follow us: Twitter <
> > > > >
> > > https://twitter.com/ConfluentInc
> > >
> > > > > > | blog
> > > > > > <
> > > > >
> > > http://www.confluent.io/blog
> > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Gwen Shapira*
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter <
> > > > >
> > > https://twitter.com/ConfluentInc
> > >
> > > > > > | blog
> > > > > <
> > > > >
> > > http://www.confluent.io/blog
> > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > *Gwen Shapira*
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter <
> > > https://twitter.com/ConfluentInc
> > > > | blog
> > > > <
> > > http://www.confluent.io/blog
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter  | blog
> > 
> >
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | 

[jira] [Created] (KAFKA-7112) StreamThread does not check for state again after pollRequests()

2018-06-27 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7112:


 Summary: StreamThread does not check for state again after 
pollRequests()
 Key: KAFKA-7112
 URL: https://issues.apache.org/jira/browse/KAFKA-7112
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


In StreamThread's main loop, we have:

{code}
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);

if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
}
{code}

in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then 
call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`. 
There is a race condition though, that if another rebalance gets triggered, 
then `onPartitionRevoked` will be called in which we will 
{{restoreConsumer.unsubscribe();}}, and then if we call 
{{taskManager.updateNewAndRestoringTasks()}} right away we will see this:

{code}
Encountered the following error during processing: 
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
assigned any partitions
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-06-27 Thread Jason Gustafson
Hey Gwen,

Why do users want partition size? It seems like a weird thing to be
concerned about. Perhaps they are trying to get a sense of the lag as a
percentage of the total size of the partition or something like that?

-Jason

On Tue, Jun 26, 2018 at 9:12 PM, Gwen Shapira  wrote:

> Thank you!
>
> On Tue, Jun 26, 2018 at 8:47 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Thanks for the feedback. The KIP is updated to also include a "partition
> > size" column.
> >
> > --Vahid
> >
> >
> >
> >
> > From:   Ted Yu 
> > To: dev@kafka.apache.org
> > Date:   06/26/2018 06:21 PM
> > Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command to
> > Show Beginning Offsets
> >
> >
> >
> > nit:
> >
> > bq. leaving this empty for compacted topics
> >
> > Some user(s) may be confused by empty partition size. How about emitting
> > 'compacted' for compacted topics ?
> >
> > Cheers
> >
> > On Tue, Jun 26, 2018 at 4:42 PM, Gwen Shapira  wrote:
> >
> > > It will be. In my experience most topics aren't compacted, so it will
> > still
> > > be valuable. If not difficult, leaving this empty for compacted topics
> > to
> > > avoid confusion will also be nice.
> > >
> > > On Tue, Jun 26, 2018 at 4:29 PM, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com> wrote:
> > >
> > > > Hi Gwen,
> > > >
> > > > Thanks for the feedback.
> > > > Regarding the partition size, couldn't "end offset - start offset" be
> > > > misleading for compacted topics?
> > > >
> > > > --Vahid
> > > >
> > > >
> > > >
> > > >
> > > > From:   Gwen Shapira 
> > > > To: dev 
> > > > Date:   06/26/2018 02:36 PM
> > > > Subject:Re: [DISCUSS] KIP-325: Extend Consumer Group Command
> > to
> > > > Show Beginning Offsets
> > > >
> > > >
> > > >
> > > > Small suggestion: you can also add a "partition size" column -
> > difference
> > > > between log-end and log-start. We've had users ask for this.
> > > >
> > > > On Tue, Jun 26, 2018 at 2:34 PM, Gwen Shapira 
> > wrote:
> > > >
> > > > > This will be useful! Thank you :)
> > > > >
> > > > > On Tue, Jun 26, 2018 at 11:23 AM, Vahid S Hashemian <
> > > > > vahidhashem...@us.ibm.com> wrote:
> > > > >
> > > > >> Hi everyone,
> > > > >>
> > > > >> I have created a trivial KIP to improve the offset reporting of
> the
> > > > >> consumer group command:
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-325%
> >
> > > >
> > > > >> 3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
> > > > >> Looking forward to your feedback!
> > > > >>
> > > > >> Thanks.
> > > > >> --Vahid
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > *Gwen Shapira*
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter <
> > > >
> > https://twitter.com/ConfluentInc
> >
> > > > > | blog
> > > > > <
> > > >
> > http://www.confluent.io/blog
> >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > *Gwen Shapira*
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter <
> > > >
> > https://twitter.com/ConfluentInc
> >
> > > > > | blog
> > > > <
> > > >
> > http://www.confluent.io/blog
> >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > *Gwen Shapira*
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter <
> > https://twitter.com/ConfluentInc
> > > | blog
> > > <
> > http://www.confluent.io/blog
> > >
> > >
> >
> >
> >
> >
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


Re: [VOTE] KIP-330: Add retentionPeriod in SessionBytesStoreSupplier

2018-06-27 Thread Bill Bejeck
+1

-Bill

On Wed, Jun 27, 2018 at 7:39 PM Guozhang Wang  wrote:

> Hello folks,
>
> I'd like to start a voting thread on KIP-330. I've intentionally skipped
> the discuss phase since it is a pretty straight-forward public API change
> and should actually be added since day one. The bug fix of KAFKA-7071
> helped us to discover this overlook.
>
>
> --
> -- Guozhang
>


[VOTE] KIP-330: Add retentionPeriod in SessionBytesStoreSupplier

2018-06-27 Thread Guozhang Wang
Hello folks,

I'd like to start a voting thread on KIP-330. I've intentionally skipped
the discuss phase since it is a pretty straight-forward public API change
and should actually be added since day one. The bug fix of KAFKA-7071
helped us to discover this overlook.


-- 
-- Guozhang


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Yishun Guan
I see! Thanks. -Yishun

On Wed, Jun 27, 2018, 4:10 PM Guozhang Wang  wrote:

> Hi Yishun,
>
> We need to wait at least 72 business hours with three binding votes,
> although you already have enough votes (me, Matthias, Damian) we still need
> to wait enough hours for people to take a look and see if they have any
> different opinions.
>
> After 72 hours have passed since you started the vote thread, we can close
> it as accepted.
>
> Guozhang
>
> On Wed, Jun 27, 2018 at 3:37 PM, Yishun Guan  wrote:
>
> > Added! Thank you Colin. Do we now have enough votes? I read the bylaws
> and
> > still a little bit confused. Thanks. - Yishun
> >
> > On Wed, Jun 27, 2018 at 3:24 PM, Colin McCabe 
> wrote:
> >
> > > P.S.  +1 (non-binding) once you add the info about it being
> thread-safe.
> > >
> > > best,
> > >
> > >
> > > On Wed, Jun 27, 2018, at 15:23, Colin McCabe wrote:
> > > > On Tue, Jun 26, 2018, at 13:24, Yishun Guan wrote:
> > > > > Hi Colin,
> > > > >
> > > > > I agree with what Guozhang's opinion that because all the other
> > clients
> > > > > have it (producer, consumer..) and this will gain more visibility
> for
> > > those
> > > > > application that use admin client. (Now I added this sentence to
> the
> > > KIP)
> > > >
> > > > I agree.  Thanks.
> > > >
> > > > > Since this returns an unmodifiableMap(like all the other client's
> > > metrics()
> > > > > return), I assume this will be thread-safe, what do you think?
> > > >
> > > > Please document that it is thread-safe.
> > > >
> > > > thanks,
> > > > Colin
> > > >
> > > > >
> > > > > Thanks,
> > > > > Yishun
> > > > >
> > > > >
> > > > > On Tue, Jun 26, 2018 at 11:51 AM, Colin McCabe  >
> > > wrote:
> > > > >
> > > > > > Can you add a little more explanation to the KIP for why you are
> > > adding
> > > > > > this method?  Is it something streams needs, for example?  Will
> it
> > > help
> > > > > > other applications that use admin client and want to expose
> > metrics?
> > > > > >
> > > > > > What are the thread-safety guarantees for the map which is
> > returned?
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 26, 2018, at 11:29, Yishun Guan wrote:
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I am starting a vote on this KIP:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/x/lQg0BQ
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yishun
> > > > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Guozhang Wang
Hi Yishun,

We need to wait at least 72 business hours with three binding votes,
although you already have enough votes (me, Matthias, Damian) we still need
to wait enough hours for people to take a look and see if they have any
different opinions.

After 72 hours have passed since you started the vote thread, we can close
it as accepted.

Guozhang

On Wed, Jun 27, 2018 at 3:37 PM, Yishun Guan  wrote:

> Added! Thank you Colin. Do we now have enough votes? I read the bylaws and
> still a little bit confused. Thanks. - Yishun
>
> On Wed, Jun 27, 2018 at 3:24 PM, Colin McCabe  wrote:
>
> > P.S.  +1 (non-binding) once you add the info about it being thread-safe.
> >
> > best,
> >
> >
> > On Wed, Jun 27, 2018, at 15:23, Colin McCabe wrote:
> > > On Tue, Jun 26, 2018, at 13:24, Yishun Guan wrote:
> > > > Hi Colin,
> > > >
> > > > I agree with what Guozhang's opinion that because all the other
> clients
> > > > have it (producer, consumer..) and this will gain more visibility for
> > those
> > > > application that use admin client. (Now I added this sentence to the
> > KIP)
> > >
> > > I agree.  Thanks.
> > >
> > > > Since this returns an unmodifiableMap(like all the other client's
> > metrics()
> > > > return), I assume this will be thread-safe, what do you think?
> > >
> > > Please document that it is thread-safe.
> > >
> > > thanks,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Yishun
> > > >
> > > >
> > > > On Tue, Jun 26, 2018 at 11:51 AM, Colin McCabe 
> > wrote:
> > > >
> > > > > Can you add a little more explanation to the KIP for why you are
> > adding
> > > > > this method?  Is it something streams needs, for example?  Will it
> > help
> > > > > other applications that use admin client and want to expose
> metrics?
> > > > >
> > > > > What are the thread-safety guarantees for the map which is
> returned?
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Tue, Jun 26, 2018, at 11:29, Yishun Guan wrote:
> > > > > > Hi All,
> > > > > >
> > > > > > I am starting a vote on this KIP:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/x/lQg0BQ
> > > > > >
> > > > > > Thanks,
> > > > > > Yishun
> > > > >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Sounds good to me.

On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> That's fair. In fact, perhaps we do not need this API at all. We already
> have the new seek() in this KIP which can do the lookup based on epoch for
> this use case. I guess we should probably call it seekToNearest() though to
> make it clear that the final position may be different from what was
> requested.
>
> Thanks,
> Jason
>
> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > I think it is less worthwhile to add KafkaConsumer#
> offsetsForLeaderEpochs,
> > since probably only very advanced users are aware of the leaderEpoch, and
> > hence ever care to use it anyways. It is more like an admin client
> > operation than a consumer client operation: if the motivation is to
> > facility customized reset policy, maybe adding it as
> > AdminClient#offsetsForLeaderEpochs
> > is better as it is not an aggressive assumption that for such advanced
> > users they are willing to use some admin client to get further
> information?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> > wrote:
> >
> > > Thanks for the feedback. I've updated the KIP. Specifically I removed
> the
> > > "closest" reset option and the proposal to reset by timestamp when the
> > > precise truncation point cannot be determined. Instead, I proposed that
> > we
> > > always reset using the nearest epoch when a reset policy is defined
> > (either
> > > "earliest" or "latest"). Does that sound reasonable?
> > >
> > > One thing I am still debating is whether it would be better to have a
> > > separate API to find the closest offset using the leader epoch. In the
> > > current KIP, I suggested to piggyback this information on an exception,
> > but
> > > I'm beginning to think it would be better not to hide the lookup. It is
> > > awkward to implement since it means delaying the exception and the API
> > may
> > > actually be useful when customizing reset logic if no auto reset policy
> > is
> > > defined. I was thinking we can add an API like the following:
> > >
> > > Map
> > > offsetsForLeaderEpochs(Map epochsToSearch)
> > >
> > > Thoughts?
> > >
> > > -Jason
> > >
> > >
> > >
> > >
> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > @Dong
> > > >
> > > > Those are fair points. Both approaches require some fuzziness to
> reset
> > > the
> > > > offset in these pathological scenarios and we cannot guarantee
> > > > at-least-once delivery either way unless we have the full history of
> > > leader
> > > > epochs that were consumed. The KIP-101 logic may actually be more
> > > accurate
> > > > than using timestamps because it does not depend on the messages
> which
> > > are
> > > > written after the unclean leader election. The case we're talking
> about
> > > > should be extremely rare in practice anyway. I also agree that we may
> > not
> > > > want to add new machinery if it only helps the old message format.
> Ok,
> > > > let's go ahead and drop the timestamp.
> > > >
> > > > @Guozhang
> > > >
> > > > * My current understanding is that, with unclean leader election
> turned
> > > on,
> > > >> exactly-once is out of the window since we cannot guarantee that all
> > > >> committed message markers will not be lost. And hence there is no
> need
> > > to
> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes
> with
> > > >> read.committed turned on. Is that right?
> > > >
> > > >
> > > > Yes, that's right. EoS and unclean leader election don't mix well. It
> > may
> > > > be worth considering separately whether we should try to reconcile
> the
> > > > transaction log following an unclean leader election. At least we may
> > be
> > > > able to prevent dangling transactions from blocking consumers. This
> KIP
> > > > does not address this problem.
> > > >
> > > > * MINOR: "if the epoch is greater than the minimum expected epoch,
> that
> > > the
> > > >> new epoch does not begin at an earlier offset than the fetch offset.
> > In
> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> > error
> > > >> code" should it be "does not begin at a later offset than the fetch
> > > >> offset"?
> > > >
> > > >
> > > > I think the comment is correct, though the phrasing may be confusing.
> > We
> > > > know truncation has occurred if there exists a larger epoch with a
> > > starting
> > > > offset that is lower than the fetch offset. Let me try to rephrase
> > this.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> Jason, thanks for the KIP. A few comments:
> > > >>
> > > >> * I think Dong's question about whether to use timestamp-based
> > approach
> > > >> v.s. start-offset-of-first-larger-epoch is valid; more
> specifically,
> > > with
> > > >> timestamp-based approach we may still be reseting to an offset
> falling
> > > >> into
> > > >> the 

[jira] [Resolved] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors

2018-06-27 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7091.

Resolution: Fixed

> AdminClient should handle FindCoordinatorResponse errors
> 
>
> Key: KAFKA-7091
> URL: https://issues.apache.org/jira/browse/KAFKA-7091
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.1
>
>
> Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets 
> method implementation ignoring FindCoordinatorResponse errors. This causes 
> admin client request timeouts incase of authorization errors.  We should 
> handle these errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Hey Guozhang,

That's fair. In fact, perhaps we do not need this API at all. We already
have the new seek() in this KIP which can do the lookup based on epoch for
this use case. I guess we should probably call it seekToNearest() though to
make it clear that the final position may be different from what was
requested.

Thanks,
Jason

On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang  wrote:

> Hi Jason,
>
> I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs,
> since probably only very advanced users are aware of the leaderEpoch, and
> hence ever care to use it anyways. It is more like an admin client
> operation than a consumer client operation: if the motivation is to
> facility customized reset policy, maybe adding it as
> AdminClient#offsetsForLeaderEpochs
> is better as it is not an aggressive assumption that for such advanced
> users they are willing to use some admin client to get further information?
>
>
> Guozhang
>
>
> On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> wrote:
>
> > Thanks for the feedback. I've updated the KIP. Specifically I removed the
> > "closest" reset option and the proposal to reset by timestamp when the
> > precise truncation point cannot be determined. Instead, I proposed that
> we
> > always reset using the nearest epoch when a reset policy is defined
> (either
> > "earliest" or "latest"). Does that sound reasonable?
> >
> > One thing I am still debating is whether it would be better to have a
> > separate API to find the closest offset using the leader epoch. In the
> > current KIP, I suggested to piggyback this information on an exception,
> but
> > I'm beginning to think it would be better not to hide the lookup. It is
> > awkward to implement since it means delaying the exception and the API
> may
> > actually be useful when customizing reset logic if no auto reset policy
> is
> > defined. I was thinking we can add an API like the following:
> >
> > Map
> > offsetsForLeaderEpochs(Map epochsToSearch)
> >
> > Thoughts?
> >
> > -Jason
> >
> >
> >
> >
> > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> > wrote:
> >
> > > @Dong
> > >
> > > Those are fair points. Both approaches require some fuzziness to reset
> > the
> > > offset in these pathological scenarios and we cannot guarantee
> > > at-least-once delivery either way unless we have the full history of
> > leader
> > > epochs that were consumed. The KIP-101 logic may actually be more
> > accurate
> > > than using timestamps because it does not depend on the messages which
> > are
> > > written after the unclean leader election. The case we're talking about
> > > should be extremely rare in practice anyway. I also agree that we may
> not
> > > want to add new machinery if it only helps the old message format. Ok,
> > > let's go ahead and drop the timestamp.
> > >
> > > @Guozhang
> > >
> > > * My current understanding is that, with unclean leader election turned
> > on,
> > >> exactly-once is out of the window since we cannot guarantee that all
> > >> committed message markers will not be lost. And hence there is no need
> > to
> > >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> > >> read.committed turned on. Is that right?
> > >
> > >
> > > Yes, that's right. EoS and unclean leader election don't mix well. It
> may
> > > be worth considering separately whether we should try to reconcile the
> > > transaction log following an unclean leader election. At least we may
> be
> > > able to prevent dangling transactions from blocking consumers. This KIP
> > > does not address this problem.
> > >
> > > * MINOR: "if the epoch is greater than the minimum expected epoch, that
> > the
> > >> new epoch does not begin at an earlier offset than the fetch offset.
> In
> > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> error
> > >> code" should it be "does not begin at a later offset than the fetch
> > >> offset"?
> > >
> > >
> > > I think the comment is correct, though the phrasing may be confusing.
> We
> > > know truncation has occurred if there exists a larger epoch with a
> > starting
> > > offset that is lower than the fetch offset. Let me try to rephrase
> this.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> > wrote:
> > >
> > >> Jason, thanks for the KIP. A few comments:
> > >>
> > >> * I think Dong's question about whether to use timestamp-based
> approach
> > >> v.s. start-offset-of-first-larger-epoch is valid; more specifically,
> > with
> > >> timestamp-based approach we may still be reseting to an offset falling
> > >> into
> > >> the truncated interval, and hence we may still miss some data, i.e.
> not
> > >> guaranteeing at-least-once still. With the
> > >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee
> > no
> > >> valid data is missed when we have consecutive log truncations (maybe
> we
> > >> need to look back into details of KIP-101 to figure it out). If the
> > latter

[jira] [Created] (KAFKA-7111) Review the NetworkClient log level used

2018-06-27 Thread Luan Cestari (JIRA)
Luan Cestari created KAFKA-7111:
---

 Summary: Review the NetworkClient log level used
 Key: KAFKA-7111
 URL: https://issues.apache.org/jira/browse/KAFKA-7111
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Luan Cestari


Hi,

 

I was using Kafka on some projects and unfortunately I had to use Debug (and 
some times even Trace) log level to see some issues. One of the most recently 
cases was: 

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L872]

 

If I got the name of the broker and it is unreachable, the errors should be 
more severe than "DEBUG" level IMHO. I would at least put a INFO level for this 
case or ERROR level (which seems to fit better but I don't know the practices 
used in the project).

 

Thank you in advance

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Yishun Guan
Added! Thank you Colin. Do we now have enough votes? I read the bylaws and
still a little bit confused. Thanks. - Yishun

On Wed, Jun 27, 2018 at 3:24 PM, Colin McCabe  wrote:

> P.S.  +1 (non-binding) once you add the info about it being thread-safe.
>
> best,
>
>
> On Wed, Jun 27, 2018, at 15:23, Colin McCabe wrote:
> > On Tue, Jun 26, 2018, at 13:24, Yishun Guan wrote:
> > > Hi Colin,
> > >
> > > I agree with what Guozhang's opinion that because all the other clients
> > > have it (producer, consumer..) and this will gain more visibility for
> those
> > > application that use admin client. (Now I added this sentence to the
> KIP)
> >
> > I agree.  Thanks.
> >
> > > Since this returns an unmodifiableMap(like all the other client's
> metrics()
> > > return), I assume this will be thread-safe, what do you think?
> >
> > Please document that it is thread-safe.
> >
> > thanks,
> > Colin
> >
> > >
> > > Thanks,
> > > Yishun
> > >
> > >
> > > On Tue, Jun 26, 2018 at 11:51 AM, Colin McCabe 
> wrote:
> > >
> > > > Can you add a little more explanation to the KIP for why you are
> adding
> > > > this method?  Is it something streams needs, for example?  Will it
> help
> > > > other applications that use admin client and want to expose metrics?
> > > >
> > > > What are the thread-safety guarantees for the map which is returned?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Tue, Jun 26, 2018, at 11:29, Yishun Guan wrote:
> > > > > Hi All,
> > > > >
> > > > > I am starting a vote on this KIP:
> > > > >
> > > > > https://cwiki.apache.org/confluence/x/lQg0BQ
> > > > >
> > > > > Thanks,
> > > > > Yishun
> > > >
>


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Colin McCabe
P.S.  +1 (non-binding) once you add the info about it being thread-safe.

best,


On Wed, Jun 27, 2018, at 15:23, Colin McCabe wrote:
> On Tue, Jun 26, 2018, at 13:24, Yishun Guan wrote:
> > Hi Colin,
> > 
> > I agree with what Guozhang's opinion that because all the other clients
> > have it (producer, consumer..) and this will gain more visibility for those
> > application that use admin client. (Now I added this sentence to the KIP)
> 
> I agree.  Thanks.
> 
> > Since this returns an unmodifiableMap(like all the other client's metrics()
> > return), I assume this will be thread-safe, what do you think?
> 
> Please document that it is thread-safe.
> 
> thanks,
> Colin
> 
> > 
> > Thanks,
> > Yishun
> > 
> > 
> > On Tue, Jun 26, 2018 at 11:51 AM, Colin McCabe  wrote:
> > 
> > > Can you add a little more explanation to the KIP for why you are adding
> > > this method?  Is it something streams needs, for example?  Will it help
> > > other applications that use admin client and want to expose metrics?
> > >
> > > What are the thread-safety guarantees for the map which is returned?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Jun 26, 2018, at 11:29, Yishun Guan wrote:
> > > > Hi All,
> > > >
> > > > I am starting a vote on this KIP:
> > > >
> > > > https://cwiki.apache.org/confluence/x/lQg0BQ
> > > >
> > > > Thanks,
> > > > Yishun
> > >


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Colin McCabe
On Tue, Jun 26, 2018, at 13:24, Yishun Guan wrote:
> Hi Colin,
> 
> I agree with what Guozhang's opinion that because all the other clients
> have it (producer, consumer..) and this will gain more visibility for those
> application that use admin client. (Now I added this sentence to the KIP)

I agree.  Thanks.

> Since this returns an unmodifiableMap(like all the other client's metrics()
> return), I assume this will be thread-safe, what do you think?

Please document that it is thread-safe.

thanks,
Colin

> 
> Thanks,
> Yishun
> 
> 
> On Tue, Jun 26, 2018 at 11:51 AM, Colin McCabe  wrote:
> 
> > Can you add a little more explanation to the KIP for why you are adding
> > this method?  Is it something streams needs, for example?  Will it help
> > other applications that use admin client and want to expose metrics?
> >
> > What are the thread-safety guarantees for the map which is returned?
> >
> > best,
> > Colin
> >
> >
> > On Tue, Jun 26, 2018, at 11:29, Yishun Guan wrote:
> > > Hi All,
> > >
> > > I am starting a vote on this KIP:
> > >
> > > https://cwiki.apache.org/confluence/x/lQg0BQ
> > >
> > > Thanks,
> > > Yishun
> >


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Hi Jason,

I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs,
since probably only very advanced users are aware of the leaderEpoch, and
hence ever care to use it anyways. It is more like an admin client
operation than a consumer client operation: if the motivation is to
facility customized reset policy, maybe adding it as
AdminClient#offsetsForLeaderEpochs
is better as it is not an aggressive assumption that for such advanced
users they are willing to use some admin client to get further information?


Guozhang


On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson  wrote:

> Thanks for the feedback. I've updated the KIP. Specifically I removed the
> "closest" reset option and the proposal to reset by timestamp when the
> precise truncation point cannot be determined. Instead, I proposed that we
> always reset using the nearest epoch when a reset policy is defined (either
> "earliest" or "latest"). Does that sound reasonable?
>
> One thing I am still debating is whether it would be better to have a
> separate API to find the closest offset using the leader epoch. In the
> current KIP, I suggested to piggyback this information on an exception, but
> I'm beginning to think it would be better not to hide the lookup. It is
> awkward to implement since it means delaying the exception and the API may
> actually be useful when customizing reset logic if no auto reset policy is
> defined. I was thinking we can add an API like the following:
>
> Map
> offsetsForLeaderEpochs(Map epochsToSearch)
>
> Thoughts?
>
> -Jason
>
>
>
>
> On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> wrote:
>
> > @Dong
> >
> > Those are fair points. Both approaches require some fuzziness to reset
> the
> > offset in these pathological scenarios and we cannot guarantee
> > at-least-once delivery either way unless we have the full history of
> leader
> > epochs that were consumed. The KIP-101 logic may actually be more
> accurate
> > than using timestamps because it does not depend on the messages which
> are
> > written after the unclean leader election. The case we're talking about
> > should be extremely rare in practice anyway. I also agree that we may not
> > want to add new machinery if it only helps the old message format. Ok,
> > let's go ahead and drop the timestamp.
> >
> > @Guozhang
> >
> > * My current understanding is that, with unclean leader election turned
> on,
> >> exactly-once is out of the window since we cannot guarantee that all
> >> committed message markers will not be lost. And hence there is no need
> to
> >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> >> read.committed turned on. Is that right?
> >
> >
> > Yes, that's right. EoS and unclean leader election don't mix well. It may
> > be worth considering separately whether we should try to reconcile the
> > transaction log following an unclean leader election. At least we may be
> > able to prevent dangling transactions from blocking consumers. This KIP
> > does not address this problem.
> >
> > * MINOR: "if the epoch is greater than the minimum expected epoch, that
> the
> >> new epoch does not begin at an earlier offset than the fetch offset.  In
> >> the latter case, the leader can respond with a new LOG_TRUNCATION error
> >> code" should it be "does not begin at a later offset than the fetch
> >> offset"?
> >
> >
> > I think the comment is correct, though the phrasing may be confusing. We
> > know truncation has occurred if there exists a larger epoch with a
> starting
> > offset that is lower than the fetch offset. Let me try to rephrase this.
> >
> > Thanks,
> > Jason
> >
> > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> wrote:
> >
> >> Jason, thanks for the KIP. A few comments:
> >>
> >> * I think Dong's question about whether to use timestamp-based approach
> >> v.s. start-offset-of-first-larger-epoch is valid; more specifically,
> with
> >> timestamp-based approach we may still be reseting to an offset falling
> >> into
> >> the truncated interval, and hence we may still miss some data, i.e. not
> >> guaranteeing at-least-once still. With the
> >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee
> no
> >> valid data is missed when we have consecutive log truncations (maybe we
> >> need to look back into details of KIP-101 to figure it out). If the
> latter
> >> can indeed guarantee at least once, we could consider using that
> approach.
> >>
> >> * My current understanding is that, with unclean leader election turned
> >> on,
> >> exactly-once is out of the window since we cannot guarantee that all
> >> committed message markers will not be lost. And hence there is no need
> to
> >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> >> read.committed turned on. Is that right?
> >>
> >> * MINOR: "if the epoch is greater than the minimum expected epoch, that
> >> the
> >> new epoch does not begin at an earlier offset than the fetch offset.  In
> >> the latter 

Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread Bill Bejeck
Hi John,  thanks for the KIP.

Early on in the KIP, you mention the current approaches for controlling the
rate of downstream records from a KTable, cache size configuration and
commit time.

Will these configuration parameters still be in effect for tables that
don't use suppression?  For tables taking advantage of suppression, will
these configurations have no impact?
This last question may be to implementation specific but if the requested
suppression time is longer than the specified commit time, will the latest
record in the suppression buffer get stored in a changelog?

Thanks,
Bill

On Wed, Jun 27, 2018 at 3:04 PM John Roesler  wrote:

> Thanks for the feedback, Matthias,
>
> It seems like in straightforward relational processing cases, it would not
> make sense to bound the lateness of KTables. In general, it seems better to
> have "guard rails" in place that make it easier to write sensible programs
> than insensible ones.
>
> But I'm still going to argue in favor of keeping it for all KTables ;)
>
> 1. I believe it is simpler to understand the operator if it has one uniform
> definition, regardless of context. It's well defined and intuitive what
> will happen when you use late-event suppression on a KTable, so I think
> nothing surprising or dangerous will happen in that case. From my
> perspective, having two sets of allowed operations is actually an increase
> in cognitive complexity.
>
> 2. To me, it's not crazy to use the operator this way. For example, in lieu
> of full-featured timestamp semantics, I can implement MVCC behavior when
> building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
> there are other, non-obvious applications of suppressing late events on
> KTables.
>
> 3. Not to get too much into implementation details in a KIP discussion, but
> if we did want to make late-event suppression available only on windowed
> KTables, we have two enforcement options:
>   a. check when we build the topology - this would be simple to implement,
> but would be a runtime check. Hopefully, people write tests for their
> topology before deploying them, so the feedback loop isn't instantaneous,
> but it's not too long either.
>   b. add a new WindowedKTable type - this would be a compile time check,
> but would also be substantial increase of both interface and code
> complexity.
>
> We should definitely strive to have guard rails protecting against
> surprising or dangerous behavior. Protecting against programs that we don't
> currently predict is a lesser benefit, and I think we can put up guard
> rails on a case-by-case basis for that. It seems like the increase in
> cognitive (and potentially code and interface) complexity makes me think we
> should skip this case.
>
> What do you think?
>
> Thanks,
> -John
>
> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP John.
> >
> > One initial comments about the last example "Bounded lateness": For a
> > non-windowed KTable bounding the lateness does not really make sense,
> > does it?
> >
> > Thus, I am wondering if we should allow `suppressLateEvents()` for this
> > case? It seems to be better to only allow it for windowed-KTables.
> >
> >
> > -Matthias
> >
> >
> > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > I noticed this (lack of primary parameter) as well.
> > >
> > > What you gave as new example is semantically the same as what I
> > suggested.
> > > So it is good by me.
> > >
> > > Thanks
> > >
> > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler 
> wrote:
> > >
> > >> Thanks for taking look, Ted,
> > >>
> > >> I agree this is a departure from the conventions of Streams DSL.
> > >>
> > >> Most of our config objects have one or two "required" parameters,
> which
> > fit
> > >> naturally with the static factory method approach. TimeWindow, for
> > example,
> > >> requires a size parameter, so we can naturally say
> TimeWindows.of(size).
> > >>
> > >> I think in the case of a suppression, there's really no "core"
> > parameter,
> > >> and "Suppression.of()" seems sillier than "new Suppression()". I think
> > that
> > >> Suppression.of(duration) would be ambiguous, since there are many
> > durations
> > >> that we can configure.
> > >>
> > >> However, thinking about it again, I suppose that I can give each
> > >> configuration method a static version, which would let you replace
> "new
> > >> Suppression()." with "Suppression." in all the examples. Basically,
> > instead
> > >> of "of()", we'd support any of the methods I listed.
> > >>
> > >> For example:
> > >>
> > >> windowCounts
> > >> .suppress(
> > >> Suppression
> > >> .suppressLateEvents(Duration.ofMinutes(10))
> > >> .suppressIntermediateEvents(
> > >>
> >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > >> )
> > >> );
> > >>
> > >>
> > >> Does that seem better?
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >>
> > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu  wrote:
> > >>
> > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Thanks for the feedback. I've updated the KIP. Specifically I removed the
"closest" reset option and the proposal to reset by timestamp when the
precise truncation point cannot be determined. Instead, I proposed that we
always reset using the nearest epoch when a reset policy is defined (either
"earliest" or "latest"). Does that sound reasonable?

One thing I am still debating is whether it would be better to have a
separate API to find the closest offset using the leader epoch. In the
current KIP, I suggested to piggyback this information on an exception, but
I'm beginning to think it would be better not to hide the lookup. It is
awkward to implement since it means delaying the exception and the API may
actually be useful when customizing reset logic if no auto reset policy is
defined. I was thinking we can add an API like the following:

Map
offsetsForLeaderEpochs(Map epochsToSearch)

Thoughts?

-Jason




On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
wrote:

> @Dong
>
> Those are fair points. Both approaches require some fuzziness to reset the
> offset in these pathological scenarios and we cannot guarantee
> at-least-once delivery either way unless we have the full history of leader
> epochs that were consumed. The KIP-101 logic may actually be more accurate
> than using timestamps because it does not depend on the messages which are
> written after the unclean leader election. The case we're talking about
> should be extremely rare in practice anyway. I also agree that we may not
> want to add new machinery if it only helps the old message format. Ok,
> let's go ahead and drop the timestamp.
>
> @Guozhang
>
> * My current understanding is that, with unclean leader election turned on,
>> exactly-once is out of the window since we cannot guarantee that all
>> committed message markers will not be lost. And hence there is no need to
>> have special handling logic for LOG_TRUNCATED or OOR error codes with
>> read.committed turned on. Is that right?
>
>
> Yes, that's right. EoS and unclean leader election don't mix well. It may
> be worth considering separately whether we should try to reconcile the
> transaction log following an unclean leader election. At least we may be
> able to prevent dangling transactions from blocking consumers. This KIP
> does not address this problem.
>
> * MINOR: "if the epoch is greater than the minimum expected epoch, that the
>> new epoch does not begin at an earlier offset than the fetch offset.  In
>> the latter case, the leader can respond with a new LOG_TRUNCATION error
>> code" should it be "does not begin at a later offset than the fetch
>> offset"?
>
>
> I think the comment is correct, though the phrasing may be confusing. We
> know truncation has occurred if there exists a larger epoch with a starting
> offset that is lower than the fetch offset. Let me try to rephrase this.
>
> Thanks,
> Jason
>
> On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang  wrote:
>
>> Jason, thanks for the KIP. A few comments:
>>
>> * I think Dong's question about whether to use timestamp-based approach
>> v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
>> timestamp-based approach we may still be reseting to an offset falling
>> into
>> the truncated interval, and hence we may still miss some data, i.e. not
>> guaranteeing at-least-once still. With the
>> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
>> valid data is missed when we have consecutive log truncations (maybe we
>> need to look back into details of KIP-101 to figure it out). If the latter
>> can indeed guarantee at least once, we could consider using that approach.
>>
>> * My current understanding is that, with unclean leader election turned
>> on,
>> exactly-once is out of the window since we cannot guarantee that all
>> committed message markers will not be lost. And hence there is no need to
>> have special handling logic for LOG_TRUNCATED or OOR error codes with
>> read.committed turned on. Is that right?
>>
>> * MINOR: "if the epoch is greater than the minimum expected epoch, that
>> the
>> new epoch does not begin at an earlier offset than the fetch offset.  In
>> the latter case, the leader can respond with a new LOG_TRUNCATION error
>> code" should it be "does not begin at a later offset than the fetch
>> offset"?
>>
>>
>>
>> Guozhang
>>
>>
>> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:
>>
>> > Hey Jason,
>> >
>> > Thanks for the explanation.
>> >
>> > Please correct me if this is wrong. The "unknown truncation offset"
>> > scenario happens when consumer does not have the full leaderEpoch ->
>> offset
>> > mapping. In this case we can still use the KIP-101-based approach to
>> > truncate offset to "start offset of the first Leader Epoch larger than
>> last
>> > epoch of the consumer" but it may be inaccurate. So the KIP chooses to
>> use
>> > the timestamp-based approach which is also best-effort.
>> >
>> > If this understanding is correct, for "closest" offset 

Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Harsha
+1 (binding)

Thanks,
Harsha

On Wed, Jun 27th, 2018 at 10:56 AM, Damian Guy  wrote:

> 
> 
> 
> +1 (binding)
> 
> Thanks
> 
> 
> 
> On Wed, 27 Jun 2018 at 18:50 Bill Bejeck < bbej...@gmail.com > wrote:
> 
> > +1
> >
> > -Bill
> >
> > On Wed, Jun 27, 2018 at 12:47 PM Manikumar < manikumar.re...@gmail.com >
> 
> > wrote:
> >
> > > +1 (non-binding)
> > >
> > > Thanks.
> > >
> > > On Wed, Jun 27, 2018 at 10:15 PM Matthias J. Sax < matth...@confluent.io
> >
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On 6/26/18 2:33 PM, Guozhang Wang wrote:
> > > > > +1. Thanks.
> > > > >
> > > > > On Tue, Jun 26, 2018 at 2:31 PM, Yishun Guan < gyis...@gmail.com >
> 
> > > wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> I am starting a vote on this KIP:
> > > > >>
> > > > >> https://cwiki.apache.org/confluence/x/lQg0BQ
> > > > >>
> > > > >> Thanks,
> > > > >> Yishun
> > > > >>
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> 
> 
> 
> 
> 
>

[jira] [Created] (KAFKA-7110) Windowed changelog keys not deserialized properly by TimeWindowedSerde

2018-06-27 Thread Shawn Nguyen (JIRA)
Shawn Nguyen created KAFKA-7110:
---

 Summary: Windowed changelog keys not deserialized properly by 
TimeWindowedSerde
 Key: KAFKA-7110
 URL: https://issues.apache.org/jira/browse/KAFKA-7110
 Project: Kafka
  Issue Type: Bug
Reporter: Shawn Nguyen
 Fix For: 1.1.0


Currently the TimeWindowedSerde does not deserialize the windowed keys from a 
changelog topic properly. There are a few assumptions made in the 
TimeWindowedDeserializer that prevents the changelog windowed keys from being 
correctly deserialized.

1) In the from method of WindowKeySchema (called in deserialize in 
TimeWindowedDeserializer), we extract the window from the binary key, but we 
call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for 
ChangeLoggingWindowBytesStore will log the windowed key as:

 
{noformat}
changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, 
maybeUpdateSeqnumForDups()), value);
{noformat}
 

In toStoreKeyBinary, we store the key in 
{noformat}
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
TIMESTAMP_SIZE + SEQNUM_SIZE);
{noformat}
with the seqnum (used for de-duping). So the eventual result is that when we 
deserialize, we do not assume the windowed changelog key has a seq_num, and the 
window extracted will be gibberish values since the bytes extracted won't be 
alligned.

The fix here is to introduce a new Serde in WindowSerdes that will handle 
explicitly, windowed changelog input topic. 

 

2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to 
Long.MAX_VALUE:

 
{noformat}
// TODO: fix this part as last bits of KAFKA-4468 public 
TimeWindowedDeserializer(final Deserializer inner) { this(inner, 
Long.MAX_VALUE); } 
public TimeWindowedDeserializer(final Deserializer inner, final long 
windowSize) { this.inner = inner; this.windowSize = windowSize; }
{noformat}
This will cause the end times to be giberrish when we extract the window since 
the windowSize is subtracted from the start time in:

 
{noformat}
public static  Windowed from(final byte[] binaryKey, final long 
windowSize, final Deserializer deserializer, final String topic) { final 
byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE]; 
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = 
deserializer.deserialize(topic, bytes); final Window window = 
extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); } 
private static Window extractWindow(final byte[] binaryKey, final long 
windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long 
start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return 
timeWindowForSize(start, windowSize); }
{noformat}
So in the new serde, we will make windowSize a constructor param that can be 
supplied.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread John Roesler
Thanks for the feedback, Matthias,

It seems like in straightforward relational processing cases, it would not
make sense to bound the lateness of KTables. In general, it seems better to
have "guard rails" in place that make it easier to write sensible programs
than insensible ones.

But I'm still going to argue in favor of keeping it for all KTables ;)

1. I believe it is simpler to understand the operator if it has one uniform
definition, regardless of context. It's well defined and intuitive what
will happen when you use late-event suppression on a KTable, so I think
nothing surprising or dangerous will happen in that case. From my
perspective, having two sets of allowed operations is actually an increase
in cognitive complexity.

2. To me, it's not crazy to use the operator this way. For example, in lieu
of full-featured timestamp semantics, I can implement MVCC behavior when
building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
there are other, non-obvious applications of suppressing late events on
KTables.

3. Not to get too much into implementation details in a KIP discussion, but
if we did want to make late-event suppression available only on windowed
KTables, we have two enforcement options:
  a. check when we build the topology - this would be simple to implement,
but would be a runtime check. Hopefully, people write tests for their
topology before deploying them, so the feedback loop isn't instantaneous,
but it's not too long either.
  b. add a new WindowedKTable type - this would be a compile time check,
but would also be substantial increase of both interface and code
complexity.

We should definitely strive to have guard rails protecting against
surprising or dangerous behavior. Protecting against programs that we don't
currently predict is a lesser benefit, and I think we can put up guard
rails on a case-by-case basis for that. It seems like the increase in
cognitive (and potentially code and interface) complexity makes me think we
should skip this case.

What do you think?

Thanks,
-John

On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax 
wrote:

> Thanks for the KIP John.
>
> One initial comments about the last example "Bounded lateness": For a
> non-windowed KTable bounding the lateness does not really make sense,
> does it?
>
> Thus, I am wondering if we should allow `suppressLateEvents()` for this
> case? It seems to be better to only allow it for windowed-KTables.
>
>
> -Matthias
>
>
> On 6/27/18 8:53 AM, Ted Yu wrote:
> > I noticed this (lack of primary parameter) as well.
> >
> > What you gave as new example is semantically the same as what I
> suggested.
> > So it is good by me.
> >
> > Thanks
> >
> > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler  wrote:
> >
> >> Thanks for taking look, Ted,
> >>
> >> I agree this is a departure from the conventions of Streams DSL.
> >>
> >> Most of our config objects have one or two "required" parameters, which
> fit
> >> naturally with the static factory method approach. TimeWindow, for
> example,
> >> requires a size parameter, so we can naturally say TimeWindows.of(size).
> >>
> >> I think in the case of a suppression, there's really no "core"
> parameter,
> >> and "Suppression.of()" seems sillier than "new Suppression()". I think
> that
> >> Suppression.of(duration) would be ambiguous, since there are many
> durations
> >> that we can configure.
> >>
> >> However, thinking about it again, I suppose that I can give each
> >> configuration method a static version, which would let you replace "new
> >> Suppression()." with "Suppression." in all the examples. Basically,
> instead
> >> of "of()", we'd support any of the methods I listed.
> >>
> >> For example:
> >>
> >> windowCounts
> >> .suppress(
> >> Suppression
> >> .suppressLateEvents(Duration.ofMinutes(10))
> >> .suppressIntermediateEvents(
> >>
>  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> >> )
> >> );
> >>
> >>
> >> Does that seem better?
> >>
> >> Thanks,
> >> -John
> >>
> >>
> >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu  wrote:
> >>
> >>> I started to read this KIP which contains a lot of materials.
> >>>
> >>> One suggestion:
> >>>
> >>> .suppress(
> >>> new Suppression()
> >>>
> >>>
> >>> Do you think it would be more consistent with the rest of Streams data
> >>> structures by supporting `of` ?
> >>>
> >>> Suppression.of(Duration.ofMinutes(10))
> >>>
> >>>
> >>> Cheers
> >>>
> >>>
> >>>
> >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler 
> wrote:
> >>>
>  Hello devs and users,
> 
>  Please take some time to consider this proposal for Kafka Streams:
> 
>  KIP-328: Ability to suppress updates for KTables
> 
>  link: https://cwiki.apache.org/confluence/x/sQU0BQ
> 
>  The basic idea is to provide:
>  * more usable control over update rate (vs the current state store
> >>> caches)
>  * the final-result-for-windowed-computations feature which several

Re: [VOTE] KIP-277 - Fine Grained ACL for CreateTopics API

2018-06-27 Thread Guozhang Wang
Hello guys,

Sorry for being late on this KIP, but while incorporating the docs of 277
and 290 I'm wondering if we should be extending the authorization with
create topics on other operations with these two KIPs:

Previously, in SimpleAclAuthorizer, "read, write, delete, or alter implies
allowing describe", but not "create" as it can only be applied on
"CLUSTER". It means that users need to specify additional rules for those
topics even if they are created by themselves.

One example of this is Kafka Streams' internal topics, before 2.0, users
need to add "create CLUSTER" plus "read / write TOPIC_NAME_LITERAL" with a
secured cluster, and I've seen some common scenarios where they forgot to
add the latter and was thinking that the created topics will be
auto-granted with read/write permissions.

Would it be natural to allow:

1. prefix wildcard "create" to imply prefix wildcard "read / write /
describe" (debatable whether we want to add "delete" and "alter" as well).
2. cluster "create" to imply "read / write / describe" on topics created by
the same user.



Guozhang




On Fri, May 25, 2018 at 5:55 AM, Edoardo Comar  wrote:

> Thanks Ismael, noted on the KIP
>
> On 21 May 2018 at 18:29, Ismael Juma  wrote:
> > Thanks for the KIP, +1 (binding). Can you also please describe the
> > compatibility impact of changing the error code from
> > CLUSTER_AUTHORIZATION_FAILED to TOPIC_AUTHORIZATION_FAILED?
> >
> > Ismael
> >
> > On Wed, Apr 25, 2018 at 2:45 AM Edoardo Comar  wrote:
> >
> >> Hi,
> >>
> >> The discuss thread on KIP-277 (
> >> https://www.mail-archive.com/dev@kafka.apache.org/msg86540.html )
> >> seems to have been fruitful and concerns have been addressed, please
> allow
> >> me start a vote on it:
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 277+-+Fine+Grained+ACL+for+CreateTopics+API
> >>
> >> I will update the small PR to the latest KIP semantics if the vote
> passes
> >> (as I hope :-).
> >>
> >> cheers
> >> Edo
> >> --
> >>
> >> Edoardo Comar
> >>
> >> IBM Message Hub
> >>
> >> IBM UK Ltd, Hursley Park, SO21 2JN
> >> Unless stated otherwise above:
> >> IBM United Kingdom Limited - Registered in England and Wales with number
> >> 741598.
> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >>
>
>
>
> --
> "When the people fear their government, there is tyranny; when the
> government fears the people, there is liberty." [Thomas Jefferson]
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread John Roesler
Hello again all,

I realized today that I neglected to include metrics in the proposal. I
have added them just now.

Thanks,
-John

On Tue, Jun 26, 2018 at 3:11 PM John Roesler  wrote:

> Hello devs and users,
>
> Please take some time to consider this proposal for Kafka Streams:
>
> KIP-328: Ability to suppress updates for KTables
>
> link: https://cwiki.apache.org/confluence/x/sQU0BQ
>
> The basic idea is to provide:
> * more usable control over update rate (vs the current state store caches)
> * the final-result-for-windowed-computations feature which several people
> have requested
>
> I look forward to your feedback!
>
> Thanks,
> -John
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
@Dong

Those are fair points. Both approaches require some fuzziness to reset the
offset in these pathological scenarios and we cannot guarantee
at-least-once delivery either way unless we have the full history of leader
epochs that were consumed. The KIP-101 logic may actually be more accurate
than using timestamps because it does not depend on the messages which are
written after the unclean leader election. The case we're talking about
should be extremely rare in practice anyway. I also agree that we may not
want to add new machinery if it only helps the old message format. Ok,
let's go ahead and drop the timestamp.

@Guozhang

* My current understanding is that, with unclean leader election turned on,
> exactly-once is out of the window since we cannot guarantee that all
> committed message markers will not be lost. And hence there is no need to
> have special handling logic for LOG_TRUNCATED or OOR error codes with
> read.committed turned on. Is that right?


Yes, that's right. EoS and unclean leader election don't mix well. It may
be worth considering separately whether we should try to reconcile the
transaction log following an unclean leader election. At least we may be
able to prevent dangling transactions from blocking consumers. This KIP
does not address this problem.

* MINOR: "if the epoch is greater than the minimum expected epoch, that the
> new epoch does not begin at an earlier offset than the fetch offset.  In
> the latter case, the leader can respond with a new LOG_TRUNCATION error
> code" should it be "does not begin at a later offset than the fetch
> offset"?


I think the comment is correct, though the phrasing may be confusing. We
know truncation has occurred if there exists a larger epoch with a starting
offset that is lower than the fetch offset. Let me try to rephrase this.

Thanks,
Jason

On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang  wrote:

> Jason, thanks for the KIP. A few comments:
>
> * I think Dong's question about whether to use timestamp-based approach
> v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
> timestamp-based approach we may still be reseting to an offset falling into
> the truncated interval, and hence we may still miss some data, i.e. not
> guaranteeing at-least-once still. With the
> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
> valid data is missed when we have consecutive log truncations (maybe we
> need to look back into details of KIP-101 to figure it out). If the latter
> can indeed guarantee at least once, we could consider using that approach.
>
> * My current understanding is that, with unclean leader election turned on,
> exactly-once is out of the window since we cannot guarantee that all
> committed message markers will not be lost. And hence there is no need to
> have special handling logic for LOG_TRUNCATED or OOR error codes with
> read.committed turned on. Is that right?
>
> * MINOR: "if the epoch is greater than the minimum expected epoch, that the
> new epoch does not begin at an earlier offset than the fetch offset.  In
> the latter case, the leader can respond with a new LOG_TRUNCATION error
> code" should it be "does not begin at a later offset than the fetch
> offset"?
>
>
>
> Guozhang
>
>
> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for the explanation.
> >
> > Please correct me if this is wrong. The "unknown truncation offset"
> > scenario happens when consumer does not have the full leaderEpoch ->
> offset
> > mapping. In this case we can still use the KIP-101-based approach to
> > truncate offset to "start offset of the first Leader Epoch larger than
> last
> > epoch of the consumer" but it may be inaccurate. So the KIP chooses to
> use
> > the timestamp-based approach which is also best-effort.
> >
> > If this understanding is correct, for "closest" offset reset policy and
> > "unknown truncation offset" scenario, I am wondering whether it maybe
> > better to replace timestamp-based approach with KIP-101 based approach.
> In
> > comparison to timestamp-based approach, the KIP-101-based approach seems
> to
> > simplify the API a bit since user does not need to understand timestamp.
> > Similar to the timestamp-based approach, both approaches are best-effort
> > and do not guarantee that consumer can consume all messages. It is not
> like
> > KIP-279 which guarantees that follower broker can consume all messages
> from
> > the leader.
> >
> > Then it seems that the remaining difference is mostly about accuracy,
> i.e.
> > how much message will be duplicated or missed in the "unknown truncation
> > offset" scenario. Not sure either one is clearly better than the other.
> > Note that there are two scenarios mentioned in KIP-279 which are not
> > addressed by KIP-101. Both scenarios require quick leadership change
> > between brokers, which seems to suggest that the offset based obtained
> > by "start
> > offset of the first Leader Epoch 

Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Dong Lin
Thanks for the reply Jason and Dhruvil.

Yeah we don't need config for the sanity check and thus we don't need a
KIP. I think we are on the same page of just skipping the sanity check of
segments before the recovery offset. I will close the KIP and submit a
patch for this.

On Wed, Jun 27, 2018 at 10:09 AM, Dhruvil Shah  wrote:

> +1 to what Jason said. We need a better long-term strategy for dealing with
> corrupted log and index data, but the sanity checks we have do not
> guarantee much in this regard.
>
> For now, we could do away with these index sanity checks in my opinion. We
> could handle the missing index case at startup. I think we could have
> missing index files only when users are upgrading from a version that did
> not have a particular type of index to a version that does, or if the
> operator physically deleted these files. Because these are rare scenarios,
> having to recreate a missing index should typically not affect normal
> startup time.
>
> - Dhruvil
>
> On Wed, Jun 27, 2018 at 8:47 AM Jason Gustafson 
> wrote:
>
> > Hey Dong,
> >
> >
> > So the main concern with the above approach is that, if for any reason
> the
> > > index files of inactive segment is deleted or corrupted, the broker
> will
> > > halt if there is only one log directory. This is different from the
> > > existing behavior where the broker will rebuild the index for this
> > inactive
> > > segment before it can accept any request from consumer. Though we don't
> > > have provide guarantee for segments already flushed to disk, this still
> > > seems like a change in behavior for user. Maybe we don't have to worry
> > > about this if we decide it is very rare, e.g. it happens only when
> there
> > is
> > > disk error or when there is human error.
> >
> >
> > I think we should probably still handle the case when an index file is
> > missing during startup? But considering how weak the sanity check is, it
> > seems fine to skip it.  Also, could we just make this change without a
> KIP?
> > Adding a config to enable a wimpy sanity check seems unnecessary.
> >
> > One scenario that does come up with users is actual segment corruption,
> > which is only detected by consumers that are validating CRCs. To fix it,
> we
> > have to manually delete the segments and force re-replication. It would
> be
> > helpful to have a config to enable deep checking on startup for
> particular
> > topics or partitions. This could also just be a separate tool though
> > ("kafka-fsck" or something).
> >
> > Thinking longer term, I think we need a more systematic approach to
> dealing
> > with corruption, not just in index files, but in the segments as well. It
> > might be nice, for example, if the consumer had a way to hint the broker
> > that a particular offset is corrupt. The leader might then demote itself,
> > for example, and try to repair. Lots to think through though.
> >
> > -Jason
> >
> >
> >
> >
> > On Wed, Jun 27, 2018 at 12:29 AM, Dong Lin  wrote:
> >
> > > So the main concern with the above approach is that, if for any reason
> > the
> > > index files of inactive segment is deleted or corrupted, the broker
> will
> > > halt if there is only one log directory. This is different from the
> > > existing behavior where the broker will rebuild the index for this
> > inactive
> > > segment before it can accept any request from consumer. Though we don't
> > > have provide guarantee for segments already flushed to disk, this still
> > > seems like a change in behavior for user. Maybe we don't have to worry
> > > about this if we decide it is very rare, e.g. it happens only when
> there
> > is
> > > disk error or when there is human error.
> > >
> > >
> > >
> > > On Wed, Jun 27, 2018 at 12:04 AM, Dong Lin 
> wrote:
> > >
> > > > Hey Jason,
> > > >
> > > > Thanks for the comment!
> > > >
> > > > Your comment reminded me to read through Jay's comments and my reply
> > > > again. It seems that I probably have not captured idea of Jay's
> comment
> > > > that says sanity check is not part of any formal guarantee we
> provide.
> > I
> > > > probably should have thought about this comment more. Let me reply to
> > > both
> > > > yours and Jay's comment and see if I can understand you better.
> > > >
> > > > Here are some clarifications:
> > > > - KIP does not intend to optimize recovery. It aims to optimize the
> the
> > > > sanity check when there is clean shutdown.
> > > > - Sanity check only read the last entry of the index rather than the
> > full
> > > > index
> > > > - We have already done data driven investigation though it is not
> done
> > > > using hprof or strace. The resulting rolling bounce time is
> acceptable
> > > now.
> > > > If it appears to be an issue e.g. after more data then we may need to
> > > > revisit this with more data driven investigation
> > > >
> > > > I agree with the following comments:
> > > > - We should optimize the default behavior instead of adding a new
> > config.
> > > > - sanity check of the segments 

Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Damian Guy
+1 (binding)

Thanks

On Wed, 27 Jun 2018 at 18:50 Bill Bejeck  wrote:

> +1
>
> -Bill
>
> On Wed, Jun 27, 2018 at 12:47 PM Manikumar 
> wrote:
>
> > +1 (non-binding)
> >
> > Thanks.
> >
> > On Wed, Jun 27, 2018 at 10:15 PM Matthias J. Sax 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > On 6/26/18 2:33 PM, Guozhang Wang wrote:
> > > > +1. Thanks.
> > > >
> > > > On Tue, Jun 26, 2018 at 2:31 PM, Yishun Guan 
> > wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> I am starting a vote on this KIP:
> > > >>
> > > >> https://cwiki.apache.org/confluence/x/lQg0BQ
> > > >>
> > > >> Thanks,
> > > >> Yishun
> > > >>
> > > >
> > > >
> > > >
> > >
> > >
> >
>


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Dhruvil Shah
+1 to what Jason said. We need a better long-term strategy for dealing with
corrupted log and index data, but the sanity checks we have do not
guarantee much in this regard.

For now, we could do away with these index sanity checks in my opinion. We
could handle the missing index case at startup. I think we could have
missing index files only when users are upgrading from a version that did
not have a particular type of index to a version that does, or if the
operator physically deleted these files. Because these are rare scenarios,
having to recreate a missing index should typically not affect normal
startup time.

- Dhruvil

On Wed, Jun 27, 2018 at 8:47 AM Jason Gustafson  wrote:

> Hey Dong,
>
>
> So the main concern with the above approach is that, if for any reason the
> > index files of inactive segment is deleted or corrupted, the broker will
> > halt if there is only one log directory. This is different from the
> > existing behavior where the broker will rebuild the index for this
> inactive
> > segment before it can accept any request from consumer. Though we don't
> > have provide guarantee for segments already flushed to disk, this still
> > seems like a change in behavior for user. Maybe we don't have to worry
> > about this if we decide it is very rare, e.g. it happens only when there
> is
> > disk error or when there is human error.
>
>
> I think we should probably still handle the case when an index file is
> missing during startup? But considering how weak the sanity check is, it
> seems fine to skip it.  Also, could we just make this change without a KIP?
> Adding a config to enable a wimpy sanity check seems unnecessary.
>
> One scenario that does come up with users is actual segment corruption,
> which is only detected by consumers that are validating CRCs. To fix it, we
> have to manually delete the segments and force re-replication. It would be
> helpful to have a config to enable deep checking on startup for particular
> topics or partitions. This could also just be a separate tool though
> ("kafka-fsck" or something).
>
> Thinking longer term, I think we need a more systematic approach to dealing
> with corruption, not just in index files, but in the segments as well. It
> might be nice, for example, if the consumer had a way to hint the broker
> that a particular offset is corrupt. The leader might then demote itself,
> for example, and try to repair. Lots to think through though.
>
> -Jason
>
>
>
>
> On Wed, Jun 27, 2018 at 12:29 AM, Dong Lin  wrote:
>
> > So the main concern with the above approach is that, if for any reason
> the
> > index files of inactive segment is deleted or corrupted, the broker will
> > halt if there is only one log directory. This is different from the
> > existing behavior where the broker will rebuild the index for this
> inactive
> > segment before it can accept any request from consumer. Though we don't
> > have provide guarantee for segments already flushed to disk, this still
> > seems like a change in behavior for user. Maybe we don't have to worry
> > about this if we decide it is very rare, e.g. it happens only when there
> is
> > disk error or when there is human error.
> >
> >
> >
> > On Wed, Jun 27, 2018 at 12:04 AM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for the comment!
> > >
> > > Your comment reminded me to read through Jay's comments and my reply
> > > again. It seems that I probably have not captured idea of Jay's comment
> > > that says sanity check is not part of any formal guarantee we provide.
> I
> > > probably should have thought about this comment more. Let me reply to
> > both
> > > yours and Jay's comment and see if I can understand you better.
> > >
> > > Here are some clarifications:
> > > - KIP does not intend to optimize recovery. It aims to optimize the the
> > > sanity check when there is clean shutdown.
> > > - Sanity check only read the last entry of the index rather than the
> full
> > > index
> > > - We have already done data driven investigation though it is not done
> > > using hprof or strace. The resulting rolling bounce time is acceptable
> > now.
> > > If it appears to be an issue e.g. after more data then we may need to
> > > revisit this with more data driven investigation
> > >
> > > I agree with the following comments:
> > > - We should optimize the default behavior instead of adding a new
> config.
> > > - sanity check of the segments before recovery offset is not part of
> any
> > > formal guarantee and thus we probably can just skip it.
> > >
> > > So we are all leaning towards skipping the sanity check of all segments
> > > before the recovery offset. This solution would be pretty
> straightforward
> > > to understand and implement. And I am sure it will give us all the
> > benefits
> > > that this KIP intends to achieve. Here is only one question to double
> > check:
> > >
> > > If consumer fetches from an inactive segment, broker will just use the
> > > index of that inactive segment. 

Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread Matthias J. Sax
Thanks for the KIP John.

One initial comments about the last example "Bounded lateness": For a
non-windowed KTable bounding the lateness does not really make sense,
does it?

Thus, I am wondering if we should allow `suppressLateEvents()` for this
case? It seems to be better to only allow it for windowed-KTables.


-Matthias


On 6/27/18 8:53 AM, Ted Yu wrote:
> I noticed this (lack of primary parameter) as well.
> 
> What you gave as new example is semantically the same as what I suggested.
> So it is good by me.
> 
> Thanks
> 
> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler  wrote:
> 
>> Thanks for taking look, Ted,
>>
>> I agree this is a departure from the conventions of Streams DSL.
>>
>> Most of our config objects have one or two "required" parameters, which fit
>> naturally with the static factory method approach. TimeWindow, for example,
>> requires a size parameter, so we can naturally say TimeWindows.of(size).
>>
>> I think in the case of a suppression, there's really no "core" parameter,
>> and "Suppression.of()" seems sillier than "new Suppression()". I think that
>> Suppression.of(duration) would be ambiguous, since there are many durations
>> that we can configure.
>>
>> However, thinking about it again, I suppose that I can give each
>> configuration method a static version, which would let you replace "new
>> Suppression()." with "Suppression." in all the examples. Basically, instead
>> of "of()", we'd support any of the methods I listed.
>>
>> For example:
>>
>> windowCounts
>> .suppress(
>> Suppression
>> .suppressLateEvents(Duration.ofMinutes(10))
>> .suppressIntermediateEvents(
>> IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
>> )
>> );
>>
>>
>> Does that seem better?
>>
>> Thanks,
>> -John
>>
>>
>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu  wrote:
>>
>>> I started to read this KIP which contains a lot of materials.
>>>
>>> One suggestion:
>>>
>>> .suppress(
>>> new Suppression()
>>>
>>>
>>> Do you think it would be more consistent with the rest of Streams data
>>> structures by supporting `of` ?
>>>
>>> Suppression.of(Duration.ofMinutes(10))
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler  wrote:
>>>
 Hello devs and users,

 Please take some time to consider this proposal for Kafka Streams:

 KIP-328: Ability to suppress updates for KTables

 link: https://cwiki.apache.org/confluence/x/sQU0BQ

 The basic idea is to provide:
 * more usable control over update rate (vs the current state store
>>> caches)
 * the final-result-for-windowed-computations feature which several
>> people
 have requested

 I look forward to your feedback!

 Thanks,
 -John

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


RE: [VOTE] KIP-235 Add DNS alias support for secured connection

2018-06-27 Thread Skrzypek, Jonathan
Hi,

I've modified the PR last week following comments on unit tests, could it be 
reviewed ?

https://github.com/apache/kafka/pull/4485

Jonathan Skrzypek


-Original Message-
From: Ismael Juma [mailto:ism...@juma.me.uk]
Sent: 23 May 2018 01:29
To: dev
Subject: Re: [VOTE] KIP-235 Add DNS alias support for secured connection

Thanks for the KIP. I think this is a good and low risk change. It would be
good to ensure that it works well with KIP-302 if we think that makes sense
too. In any case, +1 (binding).

Ismael

On Fri, Mar 23, 2018 at 12:05 PM Skrzypek, Jonathan <
jonathan.skrzy...@gs.com> wrote:

> Hi,
>
> I would like to start a vote for KIP-235
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D235-253A-2BAdd-2BDNS-2Balias-2Bsupport-2Bfor-2Bsecured-2Bconnection=DwIBaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=nNmJlu1rR_QFAPdxGlafmDu9_r6eaCbPOM0NM1EHo-E=uPuVydDxaxC8XfuCt8ZC6C93Gx50DlpAJaTqvC80Z_0=KJTm2ESwlBAOOKVyS_Cbt_9WdGyazwlxdWFCvkEvtd4=
>
> This is a proposition to add an option for reverse dns lookup of
> bootstrap.servers hosts, allowing the use of dns aliases on clusters using
> SASL authentication.
>
>
>
>



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Bill Bejeck
+1

-Bill

On Wed, Jun 27, 2018 at 12:47 PM Manikumar 
wrote:

> +1 (non-binding)
>
> Thanks.
>
> On Wed, Jun 27, 2018 at 10:15 PM Matthias J. Sax 
> wrote:
>
> > +1 (binding)
> >
> > On 6/26/18 2:33 PM, Guozhang Wang wrote:
> > > +1. Thanks.
> > >
> > > On Tue, Jun 26, 2018 at 2:31 PM, Yishun Guan 
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> I am starting a vote on this KIP:
> > >>
> > >> https://cwiki.apache.org/confluence/x/lQg0BQ
> > >>
> > >> Thanks,
> > >> Yishun
> > >>
> > >
> > >
> > >
> >
> >
>


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Manikumar
+1 (non-binding)

Thanks.

On Wed, Jun 27, 2018 at 10:15 PM Matthias J. Sax 
wrote:

> +1 (binding)
>
> On 6/26/18 2:33 PM, Guozhang Wang wrote:
> > +1. Thanks.
> >
> > On Tue, Jun 26, 2018 at 2:31 PM, Yishun Guan  wrote:
> >
> >> Hi All,
> >>
> >> I am starting a vote on this KIP:
> >>
> >> https://cwiki.apache.org/confluence/x/lQg0BQ
> >>
> >> Thanks,
> >> Yishun
> >>
> >
> >
> >
>
>


Re: [VOTE] KIP-324: Add method to get metrics() in AdminClient

2018-06-27 Thread Matthias J. Sax
+1 (binding)

On 6/26/18 2:33 PM, Guozhang Wang wrote:
> +1. Thanks.
> 
> On Tue, Jun 26, 2018 at 2:31 PM, Yishun Guan  wrote:
> 
>> Hi All,
>>
>> I am starting a vote on this KIP:
>>
>> https://cwiki.apache.org/confluence/x/lQg0BQ
>>
>> Thanks,
>> Yishun
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Jason, thanks for the KIP. A few comments:

* I think Dong's question about whether to use timestamp-based approach
v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
timestamp-based approach we may still be reseting to an offset falling into
the truncated interval, and hence we may still miss some data, i.e. not
guaranteeing at-least-once still. With the
start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
valid data is missed when we have consecutive log truncations (maybe we
need to look back into details of KIP-101 to figure it out). If the latter
can indeed guarantee at least once, we could consider using that approach.

* My current understanding is that, with unclean leader election turned on,
exactly-once is out of the window since we cannot guarantee that all
committed message markers will not be lost. And hence there is no need to
have special handling logic for LOG_TRUNCATED or OOR error codes with
read.committed turned on. Is that right?

* MINOR: "if the epoch is greater than the minimum expected epoch, that the
new epoch does not begin at an earlier offset than the fetch offset.  In
the latter case, the leader can respond with a new LOG_TRUNCATION error
code" should it be "does not begin at a later offset than the fetch offset"?



Guozhang


On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the explanation.
>
> Please correct me if this is wrong. The "unknown truncation offset"
> scenario happens when consumer does not have the full leaderEpoch -> offset
> mapping. In this case we can still use the KIP-101-based approach to
> truncate offset to "start offset of the first Leader Epoch larger than last
> epoch of the consumer" but it may be inaccurate. So the KIP chooses to use
> the timestamp-based approach which is also best-effort.
>
> If this understanding is correct, for "closest" offset reset policy and
> "unknown truncation offset" scenario, I am wondering whether it maybe
> better to replace timestamp-based approach with KIP-101 based approach. In
> comparison to timestamp-based approach, the KIP-101-based approach seems to
> simplify the API a bit since user does not need to understand timestamp.
> Similar to the timestamp-based approach, both approaches are best-effort
> and do not guarantee that consumer can consume all messages. It is not like
> KIP-279 which guarantees that follower broker can consume all messages from
> the leader.
>
> Then it seems that the remaining difference is mostly about accuracy, i.e.
> how much message will be duplicated or missed in the "unknown truncation
> offset" scenario. Not sure either one is clearly better than the other.
> Note that there are two scenarios mentioned in KIP-279 which are not
> addressed by KIP-101. Both scenarios require quick leadership change
> between brokers, which seems to suggest that the offset based obtained
> by "start
> offset of the first Leader Epoch larger than last epoch of the consumer"
> under these two scenarios may be very close to the offset obtained by the
> message timestamp. Does this sound reasonable?
>
> Good point that users on v1 format can get benefit with timestamp based
> approach. On the other hand it seems like a short term benefit for users
> who have not migrated. I am just not sure whether it is more important than
> designing a better API.
>
> Also, for both "latest" and "earliest" reset policy, do you think it would
> make sense to also use the KIP-101 based approach to truncate offset for
> the "unknown truncation offset" scenario?
>
>
> Thanks,
> Dong
>



-- 
-- Guozhang


[jira] [Created] (KAFKA-7109) KafkaConsumer should close its incremental fetch sessions on close

2018-06-27 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-7109:
--

 Summary: KafkaConsumer should close its incremental fetch sessions 
on close
 Key: KAFKA-7109
 URL: https://issues.apache.org/jira/browse/KAFKA-7109
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


KafkaConsumer should close its incremental fetch sessions on close.  Currently, 
the sessions are not closed, but simply time out once the consumer is gone.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] 2.0.0 RC0

2018-06-27 Thread Ted Yu
+1

Checked signatures
Ran test suite

On Mon, Jun 25, 2018 at 11:12 AM, Thomas Crayford 
wrote:

> +1 (non-binding) Heroku has run our usual set of upgrade and performance
> tests, and we haven't found any notable issues through that.
>
> On Sat, Jun 23, 2018 at 12:30 AM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > +1 (non-binding)
> >
> > Built from source and ran quickstart successfully on Ubuntu (with Java 8
> > and Java 9).
> >
> > Thanks Rajini!
> > --Vahid
> >
> >
>


Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread Ted Yu
I noticed this (lack of primary parameter) as well.

What you gave as new example is semantically the same as what I suggested.
So it is good by me.

Thanks

On Wed, Jun 27, 2018 at 7:31 AM, John Roesler  wrote:

> Thanks for taking look, Ted,
>
> I agree this is a departure from the conventions of Streams DSL.
>
> Most of our config objects have one or two "required" parameters, which fit
> naturally with the static factory method approach. TimeWindow, for example,
> requires a size parameter, so we can naturally say TimeWindows.of(size).
>
> I think in the case of a suppression, there's really no "core" parameter,
> and "Suppression.of()" seems sillier than "new Suppression()". I think that
> Suppression.of(duration) would be ambiguous, since there are many durations
> that we can configure.
>
> However, thinking about it again, I suppose that I can give each
> configuration method a static version, which would let you replace "new
> Suppression()." with "Suppression." in all the examples. Basically, instead
> of "of()", we'd support any of the methods I listed.
>
> For example:
>
> windowCounts
> .suppress(
> Suppression
> .suppressLateEvents(Duration.ofMinutes(10))
> .suppressIntermediateEvents(
> IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> )
> );
>
>
> Does that seem better?
>
> Thanks,
> -John
>
>
> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu  wrote:
>
> > I started to read this KIP which contains a lot of materials.
> >
> > One suggestion:
> >
> > .suppress(
> > new Suppression()
> >
> >
> > Do you think it would be more consistent with the rest of Streams data
> > structures by supporting `of` ?
> >
> > Suppression.of(Duration.ofMinutes(10))
> >
> >
> > Cheers
> >
> >
> >
> > On Tue, Jun 26, 2018 at 1:11 PM, John Roesler  wrote:
> >
> > > Hello devs and users,
> > >
> > > Please take some time to consider this proposal for Kafka Streams:
> > >
> > > KIP-328: Ability to suppress updates for KTables
> > >
> > > link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > >
> > > The basic idea is to provide:
> > > * more usable control over update rate (vs the current state store
> > caches)
> > > * the final-result-for-windowed-computations feature which several
> people
> > > have requested
> > >
> > > I look forward to your feedback!
> > >
> > > Thanks,
> > > -John
> > >
> >
>


Re: [kafka-clients] Re: [VOTE] 1.1.1 RC1

2018-06-27 Thread Israel Ekpo
+1 (non-binding).

Built from source and ran integration tests successfully.

Gradle 4.8.1, Java 1.8.0_171 (Oracle Corporation 25.171-b11), on Ubuntu
16.04 LTS

On Tue, Jun 26, 2018 at 4:55 AM, Jakub Scholz  wrote:

> +1 (non-binding) ... I ran my tests and verified the RC1 with my
> applications.
>
> On Mon, Jun 25, 2018 at 7:31 PM Manikumar 
> wrote:
>
> > +1 (non-binding)  Ran tests,  Verified quick start,  producer/consumer
> perf
> > tests
> >
> >
> > On Sat, Jun 23, 2018 at 8:11 AM Dong Lin  wrote:
> >
> > > Thank you for testing and voting the release!
> > >
> > > I noticed that the date for 1.1.1-rc1 is wrong. Please kindly test and
> > > vote by Tuesday, June 26, 12 pm PT.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Jun 22, 2018 at 10:09 AM, Dong Lin 
> wrote:
> > >
> > >> Hello Kafka users, developers and client-developers,
> > >>
> > >> This is the second candidate for release of Apache Kafka 1.1.1.
> > >>
> > >> Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that was
> > >> first released with 1.1.0 about 3 months ago. We have fixed about 25
> > issues
> > >> since that release. A few of the more significant fixes include:
> > >>
> > >> KAFKA-6925  - Fix
> > >> memory leak in StreamsMetricsThreadImpl
> > >> KAFKA-6937  -
> In-sync
> > >> replica delayed during fetch if replica throttle is exceeded
> > >> KAFKA-6917  -
> Process
> > >> txn completion asynchronously to avoid deadlock
> > >> KAFKA-6893  -
> Create
> > >> processors before starting acceptor to avoid ArithmeticException
> > >> KAFKA-6870  -
> > >> Fix ConcurrentModificationException in SampledStat
> > >> KAFKA-6878  - Fix
> > >> NullPointerException when querying global state store
> > >> KAFKA-6879  -
> Invoke
> > >> session init callbacks outside lock to avoid Controller deadlock
> > >> KAFKA-6857  -
> Prevent
> > >> follower from truncating to the wrong offset if undefined leader epoch
> > is
> > >> requested
> > >> KAFKA-6854  - Log
> > >> cleaner fails with transaction markers that are deleted during clean
> > >> KAFKA-6747  - Check
> > >> whether there is in-flight transaction before aborting transaction
> > >> KAFKA-6748  -
> Double
> > >> check before scheduling a new task after the punctuate call
> > >> KAFKA-6739  -
> > >> Fix IllegalArgumentException when down-converting from V2 to V0/V1
> > >> KAFKA-6728  -
> > >> Fix NullPointerException when instantiating the HeaderConverter
> > >>
> > >> Kafka 1.1.1 release plan:
> > >> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
> > >>
> > >> Release notes for the 1.1.1 release:
> > >> http://home.apache.org/~lindong/kafka-1.1.1-rc1/RELEASE_NOTES.html
> > >>
> > >> *** Please download, test and vote by Thursday, Jun 22, 12pm PT ***
> > >>
> > >> Kafka's KEYS file containing PGP keys we use to sign the release:
> > >> http://kafka.apache.org/KEYS
> > >>
> > >> * Release artifacts to be voted upon (source and binary):
> > >> http://home.apache.org/~lindong/kafka-1.1.1-rc1/
> > >>
> > >> * Maven artifacts to be voted upon:
> > >> https://repository.apache.org/content/groups/staging/
> > >>
> > >> * Javadoc:
> > >> http://home.apache.org/~lindong/kafka-1.1.1-rc1/javadoc/
> > >>
> > >> * Tag to be voted upon (off 1.1 branch) is the 1.1.1-rc1 tag:
> > >> https://github.com/apache/kafka/tree/1.1.1-rc1
> > >>
> > >> * Documentation:
> > >> http://kafka.apache.org/11/documentation.html
> > >>
> > >> * Protocol:
> > >> http://kafka.apache.org/11/protocol.html
> > >>
> > >> * Successful Jenkins builds for the 1.1 branch:
> > >> Unit/integration tests: *
> > https://builds.apache.org/job/kafka-1.1-jdk7/152/
> > >> *
> > >> System tests:
> > >> https://jenkins.confluent.io/job/system-test-kafka-branch-
> builder/1817
> > >>
> > >>
> > >> Please test and verify the release artifacts and submit a vote for
> this
> > >> RC,
> > >> or report any issues so we can fix them and get a new RC out ASAP.
> > >> Although
> > >> this release vote requires PMC votes to pass, testing, votes, and bug
> > >> reports are valuable and appreciated from everyone.
> > >>
> > >> Cheers,
> > >> Dong
> > >>
> > >>
> > >>
> > > --
> > > You received this message because you are subscribed to the Google
> Groups
> > > "kafka-clients" group.
> > > To unsubscribe from this group and stop 

Re: [VOTE] 2.0.0 RC0

2018-06-27 Thread Israel Ekpo
+1 (non-binding).

Built from source and ran integration tests successfully.

On Tue, Jun 26, 2018 at 4:57 AM, Jakub Scholz  wrote:

> +1 (non-binding) ... I ran my tests and verified the RC0 against my
> applications.
>
> On Mon, Jun 25, 2018 at 8:12 PM Thomas Crayford 
> wrote:
>
> > +1 (non-binding) Heroku has run our usual set of upgrade and performance
> > tests, and we haven't found any notable issues through that.
> >
> > On Sat, Jun 23, 2018 at 12:30 AM, Vahid S Hashemian <
> > vahidhashem...@us.ibm.com> wrote:
> >
> > > +1 (non-binding)
> > >
> > > Built from source and ran quickstart successfully on Ubuntu (with Java
> 8
> > > and Java 9).
> > >
> > > Thanks Rajini!
> > > --Vahid
> > >
> > >
> >
>


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Jason Gustafson
Hey Dong,


So the main concern with the above approach is that, if for any reason the
> index files of inactive segment is deleted or corrupted, the broker will
> halt if there is only one log directory. This is different from the
> existing behavior where the broker will rebuild the index for this inactive
> segment before it can accept any request from consumer. Though we don't
> have provide guarantee for segments already flushed to disk, this still
> seems like a change in behavior for user. Maybe we don't have to worry
> about this if we decide it is very rare, e.g. it happens only when there is
> disk error or when there is human error.


I think we should probably still handle the case when an index file is
missing during startup? But considering how weak the sanity check is, it
seems fine to skip it.  Also, could we just make this change without a KIP?
Adding a config to enable a wimpy sanity check seems unnecessary.

One scenario that does come up with users is actual segment corruption,
which is only detected by consumers that are validating CRCs. To fix it, we
have to manually delete the segments and force re-replication. It would be
helpful to have a config to enable deep checking on startup for particular
topics or partitions. This could also just be a separate tool though
("kafka-fsck" or something).

Thinking longer term, I think we need a more systematic approach to dealing
with corruption, not just in index files, but in the segments as well. It
might be nice, for example, if the consumer had a way to hint the broker
that a particular offset is corrupt. The leader might then demote itself,
for example, and try to repair. Lots to think through though.

-Jason




On Wed, Jun 27, 2018 at 12:29 AM, Dong Lin  wrote:

> So the main concern with the above approach is that, if for any reason the
> index files of inactive segment is deleted or corrupted, the broker will
> halt if there is only one log directory. This is different from the
> existing behavior where the broker will rebuild the index for this inactive
> segment before it can accept any request from consumer. Though we don't
> have provide guarantee for segments already flushed to disk, this still
> seems like a change in behavior for user. Maybe we don't have to worry
> about this if we decide it is very rare, e.g. it happens only when there is
> disk error or when there is human error.
>
>
>
> On Wed, Jun 27, 2018 at 12:04 AM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for the comment!
> >
> > Your comment reminded me to read through Jay's comments and my reply
> > again. It seems that I probably have not captured idea of Jay's comment
> > that says sanity check is not part of any formal guarantee we provide. I
> > probably should have thought about this comment more. Let me reply to
> both
> > yours and Jay's comment and see if I can understand you better.
> >
> > Here are some clarifications:
> > - KIP does not intend to optimize recovery. It aims to optimize the the
> > sanity check when there is clean shutdown.
> > - Sanity check only read the last entry of the index rather than the full
> > index
> > - We have already done data driven investigation though it is not done
> > using hprof or strace. The resulting rolling bounce time is acceptable
> now.
> > If it appears to be an issue e.g. after more data then we may need to
> > revisit this with more data driven investigation
> >
> > I agree with the following comments:
> > - We should optimize the default behavior instead of adding a new config.
> > - sanity check of the segments before recovery offset is not part of any
> > formal guarantee and thus we probably can just skip it.
> >
> > So we are all leaning towards skipping the sanity check of all segments
> > before the recovery offset. This solution would be pretty straightforward
> > to understand and implement. And I am sure it will give us all the
> benefits
> > that this KIP intends to achieve. Here is only one question to double
> check:
> >
> > If consumer fetches from an inactive segment, broker will just use the
> > index of that inactive segment. If anything goes wrong, e.g. the index
> file
> > is corrupted or the index file does not exist, then the broker will just
> > consider it as IOException, mark the disk and the partitions on the disk
> > offline and respond KafkaStorageException to consumer. Does this sound
> OK?
> > One alternative solution is to let broker rebuild index. But this
> > alternative solution is inconsistent with the idea that "sanity check is
> not
> > part of any formal guarantee" and it may tie up all request handler
> > thread for rebuilding the indexed.
> >
> >
> > If this solution sounds right, I will update the KIP accordingly.
> >
> > Thanks,
> > Dong
> >
> > On Tue, Jun 26, 2018 at 3:23 PM, Jason Gustafson 
> > wrote:
> >
> >> Hey Dong,
> >>
> >> Sorry for being slow to catch up to this.
> >>
> >> I think the benefit of the sanity check seems a little dubious in the
> >> 

Re: Requesting Permission To Create KIP And Assign JIRAs

2018-06-27 Thread Guozhang Wang
Hello Kevin,

It's done.


Guozhang

On Tue, Jun 26, 2018 at 10:37 PM, Kevin Lu  wrote:

> Hi All,
>
> I would like to start contributing to Kafka but I do not have access to
> create KIPs or assign JIRA to myself.
>
> Can someone set it up for me?
>
> Confluence id: lu.kevin
> Jira username: lu.kevin
>
> Email: lu.ke...@berkeley.edu
>
> Thanks!
>
> Regards,
> Kevin
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

2018-06-27 Thread John Roesler
Thanks for taking look, Ted,

I agree this is a departure from the conventions of Streams DSL.

Most of our config objects have one or two "required" parameters, which fit
naturally with the static factory method approach. TimeWindow, for example,
requires a size parameter, so we can naturally say TimeWindows.of(size).

I think in the case of a suppression, there's really no "core" parameter,
and "Suppression.of()" seems sillier than "new Suppression()". I think that
Suppression.of(duration) would be ambiguous, since there are many durations
that we can configure.

However, thinking about it again, I suppose that I can give each
configuration method a static version, which would let you replace "new
Suppression()." with "Suppression." in all the examples. Basically, instead
of "of()", we'd support any of the methods I listed.

For example:

windowCounts
.suppress(
Suppression
.suppressLateEvents(Duration.ofMinutes(10))
.suppressIntermediateEvents(
IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
)
);


Does that seem better?

Thanks,
-John


On Wed, Jun 27, 2018 at 12:44 AM Ted Yu  wrote:

> I started to read this KIP which contains a lot of materials.
>
> One suggestion:
>
> .suppress(
> new Suppression()
>
>
> Do you think it would be more consistent with the rest of Streams data
> structures by supporting `of` ?
>
> Suppression.of(Duration.ofMinutes(10))
>
>
> Cheers
>
>
>
> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler  wrote:
>
> > Hello devs and users,
> >
> > Please take some time to consider this proposal for Kafka Streams:
> >
> > KIP-328: Ability to suppress updates for KTables
> >
> > link: https://cwiki.apache.org/confluence/x/sQU0BQ
> >
> > The basic idea is to provide:
> > * more usable control over update rate (vs the current state store
> caches)
> > * the final-result-for-windowed-computations feature which several people
> > have requested
> >
> > I look forward to your feedback!
> >
> > Thanks,
> > -John
> >
>


Re: [kafka-clients] Re: [VOTE] 1.1.1 RC1

2018-06-27 Thread Mickael Maison
+1 non-binding
Ran quick start and checked signatures

On Wed, Jun 27, 2018 at 8:07 AM, Dong Lin  wrote:
> Thank you all for your test and votes!
>
> It will be great to have more votes from PMC so that we can conclude
> kafka-1.1.1 release :)


Jenkins build is back to normal : kafka-trunk-jdk10 #256

2018-06-27 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-322: Return new error code for DeleteTopics API when topic deletion disabled.

2018-06-27 Thread Ted Yu
bq. set the serialVersionUID = 1L in Kafka exception classes

Indeed. Should have checked earlier.

On Wed, Jun 27, 2018 at 1:38 AM, Manikumar 
wrote:

>  Hi,
>
> Thanks for the review.
>
>
> > nit: you are going to fill in an actual value below in your PR, right ?
> > private static final long serialVersionUID = 1L;
> >
> >
>  We normally set the serialVersionUID = 1L in Kafka exception classes.
>
> In Motivation, please mention the IllegalStateException scenario.
> >
>
>  IllegalStateException mentioned in the JIRA is specific to user
> implementaion.
>  Main issue is we wont get response form the server. In this case, admin
> client times out the requests.
>


Re: [DISCUSS] KIP-322: Return new error code for DeleteTopics API when topic deletion disabled.

2018-06-27 Thread Manikumar
 Hi,

Thanks for the review.


> nit: you are going to fill in an actual value below in your PR, right ?
> private static final long serialVersionUID = 1L;
>
>
 We normally set the serialVersionUID = 1L in Kafka exception classes.

In Motivation, please mention the IllegalStateException scenario.
>

 IllegalStateException mentioned in the JIRA is specific to user
implementaion.
 Main issue is we wont get response form the server. In this case, admin
client times out the requests.


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Dong Lin
So the main concern with the above approach is that, if for any reason the
index files of inactive segment is deleted or corrupted, the broker will
halt if there is only one log directory. This is different from the
existing behavior where the broker will rebuild the index for this inactive
segment before it can accept any request from consumer. Though we don't
have provide guarantee for segments already flushed to disk, this still
seems like a change in behavior for user. Maybe we don't have to worry
about this if we decide it is very rare, e.g. it happens only when there is
disk error or when there is human error.



On Wed, Jun 27, 2018 at 12:04 AM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the comment!
>
> Your comment reminded me to read through Jay's comments and my reply
> again. It seems that I probably have not captured idea of Jay's comment
> that says sanity check is not part of any formal guarantee we provide. I
> probably should have thought about this comment more. Let me reply to both
> yours and Jay's comment and see if I can understand you better.
>
> Here are some clarifications:
> - KIP does not intend to optimize recovery. It aims to optimize the the
> sanity check when there is clean shutdown.
> - Sanity check only read the last entry of the index rather than the full
> index
> - We have already done data driven investigation though it is not done
> using hprof or strace. The resulting rolling bounce time is acceptable now.
> If it appears to be an issue e.g. after more data then we may need to
> revisit this with more data driven investigation
>
> I agree with the following comments:
> - We should optimize the default behavior instead of adding a new config.
> - sanity check of the segments before recovery offset is not part of any
> formal guarantee and thus we probably can just skip it.
>
> So we are all leaning towards skipping the sanity check of all segments
> before the recovery offset. This solution would be pretty straightforward
> to understand and implement. And I am sure it will give us all the benefits
> that this KIP intends to achieve. Here is only one question to double check:
>
> If consumer fetches from an inactive segment, broker will just use the
> index of that inactive segment. If anything goes wrong, e.g. the index file
> is corrupted or the index file does not exist, then the broker will just
> consider it as IOException, mark the disk and the partitions on the disk
> offline and respond KafkaStorageException to consumer. Does this sound OK?
> One alternative solution is to let broker rebuild index. But this
> alternative solution is inconsistent with the idea that "sanity check is not
> part of any formal guarantee" and it may tie up all request handler
> thread for rebuilding the indexed.
>
>
> If this solution sounds right, I will update the KIP accordingly.
>
> Thanks,
> Dong
>
> On Tue, Jun 26, 2018 at 3:23 PM, Jason Gustafson 
> wrote:
>
>> Hey Dong,
>>
>> Sorry for being slow to catch up to this.
>>
>> I think the benefit of the sanity check seems a little dubious in the
>> first
>> place. We detect garbage at the end of the index file, but that's about
>> it.
>> Is there any reason to think that corruption is more likely to occur there
>> or any other reason to think this check is still beneficial for flushed
>> data? I assume we did the check because we presumed it was cheap, but
>> perhaps the cost is adding up as the number of partitions grows. How much
>> does startup time improve if we skip the sanity check for data earlier
>> than
>> the recovery point? Does the lazy loading itself give some additional
>> benefit beyond skipping the sanity check? As Jay mentions above, the
>> sanity
>> checks seem strictly speaking optional. We don't bother checking the
>> segments themselves for example.
>>
>> Thanks,
>> Jason
>>
>>
>>
>>
>> It probably still makes sense for segments beyond the recovery point
>>
>> On Wed, Mar 21, 2018 at 9:59 PM, Dong Lin  wrote:
>>
>> > Hey Jay,
>> >
>> > Yeah our existing sanity check only read the last entry in the index
>> files.
>> > I must have miscommunicated if I previously said it was reading the full
>> > index. Broker appears to be spending a lot of time just to read the last
>> > entry of index files for every log segment. This is probably because OS
>> > will load a chunk of data that is much larger than the entry itself from
>> > disk to page cache. This KIP tries to make this part of operation lazy.
>> I
>> > guess you are suggesting that we should just make the lazy loading the
>> > default behavior?
>> >
>> > Yes we currently require manual intervention if the log file is
>> corrupted,
>> > i.e. if two messages with the same offset are appended to the disk
>> > (KAFKA-6488). The sanity check on broker startup is a bit different
>> since
>> > it deals with the corruption of index files (e.g. offset index, time
>> index
>> > and snapshot files) instead of the log data. In this case if index files
>> > are 

Re: [kafka-clients] Re: [VOTE] 1.1.1 RC1

2018-06-27 Thread Dong Lin
Thank you all for your test and votes!

It will be great to have more votes from PMC so that we can conclude
kafka-1.1.1 release :)


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Dong Lin
Hey Jason,

Thanks for the comment!

Your comment reminded me to read through Jay's comments and my reply again.
It seems that I probably have not captured idea of Jay's comment that says
sanity check is not part of any formal guarantee we provide. I probably
should have thought about this comment more. Let me reply to both yours and
Jay's comment and see if I can understand you better.

Here are some clarifications:
- KIP does not intend to optimize recovery. It aims to optimize the the
sanity check when there is clean shutdown.
- Sanity check only read the last entry of the index rather than the full
index
- We have already done data driven investigation though it is not done
using hprof or strace. The resulting rolling bounce time is acceptable now.
If it appears to be an issue e.g. after more data then we may need to
revisit this with more data driven investigation

I agree with the following comments:
- We should optimize the default behavior instead of adding a new config.
- sanity check of the segments before recovery offset is not part of any
formal guarantee and thus we probably can just skip it.

So we are all leaning towards skipping the sanity check of all segments
before the recovery offset. This solution would be pretty straightforward
to understand and implement. And I am sure it will give us all the benefits
that this KIP intends to achieve. Here is only one question to double check:

If consumer fetches from an inactive segment, broker will just use the
index of that inactive segment. If anything goes wrong, e.g. the index file
is corrupted or the index file does not exist, then the broker will just
consider it as IOException, mark the disk and the partitions on the disk
offline and respond KafkaStorageException to consumer. Does this sound OK?
One alternative solution is to let broker rebuild index. But this
alternative solution is inconsistent with the idea that "sanity check is not
part of any formal guarantee" and it may tie up all request handler thread
for rebuilding the indexed.


If this solution sounds right, I will update the KIP accordingly.

Thanks,
Dong

On Tue, Jun 26, 2018 at 3:23 PM, Jason Gustafson  wrote:

> Hey Dong,
>
> Sorry for being slow to catch up to this.
>
> I think the benefit of the sanity check seems a little dubious in the first
> place. We detect garbage at the end of the index file, but that's about it.
> Is there any reason to think that corruption is more likely to occur there
> or any other reason to think this check is still beneficial for flushed
> data? I assume we did the check because we presumed it was cheap, but
> perhaps the cost is adding up as the number of partitions grows. How much
> does startup time improve if we skip the sanity check for data earlier than
> the recovery point? Does the lazy loading itself give some additional
> benefit beyond skipping the sanity check? As Jay mentions above, the sanity
> checks seem strictly speaking optional. We don't bother checking the
> segments themselves for example.
>
> Thanks,
> Jason
>
>
>
>
> It probably still makes sense for segments beyond the recovery point
>
> On Wed, Mar 21, 2018 at 9:59 PM, Dong Lin  wrote:
>
> > Hey Jay,
> >
> > Yeah our existing sanity check only read the last entry in the index
> files.
> > I must have miscommunicated if I previously said it was reading the full
> > index. Broker appears to be spending a lot of time just to read the last
> > entry of index files for every log segment. This is probably because OS
> > will load a chunk of data that is much larger than the entry itself from
> > disk to page cache. This KIP tries to make this part of operation lazy. I
> > guess you are suggesting that we should just make the lazy loading the
> > default behavior?
> >
> > Yes we currently require manual intervention if the log file is
> corrupted,
> > i.e. if two messages with the same offset are appended to the disk
> > (KAFKA-6488). The sanity check on broker startup is a bit different since
> > it deals with the corruption of index files (e.g. offset index, time
> index
> > and snapshot files) instead of the log data. In this case if index files
> > are corrupted broker will automatically recover it by rebuilding the
> index
> > files using data in the log files, without requiring manual intervention.
> > Thus the design question is whether this should be done before broker can
> > become leader for any partitions -- there is tradeoff between broker
> > startup time and risk of delaying user requests if broker need to rebuild
> > index files when it is already leader. I prefer lazy loading to reduce
> > broker startup time. Not sure what are the feedback from the community on
> > this issue.
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Mar 21, 2018 at 7:36 AM, Jay Kreps  wrote:
> >
> > > Hey Dong,
> > >
> > > Makes total sense. What I'm saying is I don't think that the sanity
> check
> > > is part of any formal guarantee we provide. It is true that corruption
> of
> > > 

[jira] [Created] (KAFKA-7108) "Exactly-once" stream breaks production exception handler contract

2018-06-27 Thread Anna O (JIRA)
Anna O created KAFKA-7108:
-

 Summary: "Exactly-once" stream breaks production exception handler 
contract
 Key: KAFKA-7108
 URL: https://issues.apache.org/jira/browse/KAFKA-7108
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Anna O


I have a stream configured with "default.production.exception.handler" that is 
supposed to log the error and continue. When I set "processing.guarantee" to 
"exactly_once" it appeared that retryable NotEnoughReplicasException that 
passed the production exception handler was rethrown by the TransactionManager 
wrapped with KafkaException and terminated the stream thread:

_org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error stateat 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
 ~[kafka-clients-1.1.0.jar:?]_
 _at 
org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
 ~[kafka-clients-1.1.0.jar:?]_
 _at 
org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
 ~[kafka-clients-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
 ~[kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
 [kafka-streams-1.1.0.jar:?]_
 _at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
 [kafka-streams-1.1.0.jar:?]_
 _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: 
Messages are rejected since there are fewer in-sync replicas than required._

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-27 Thread Dongjin Lee
+1. (binding)

On Tue, Jun 26, 2018, 2:34 AM Damian Guy  wrote:

> Thanks Bill! +1
>
> On Mon, 25 Jun 2018 at 18:57 Ted Yu  wrote:
>
> > +1
> >
> > On Mon, Jun 25, 2018 at 9:45 AM, Guozhang Wang 
> wrote:
> >
> > > +1.
> > >
> > > On Mon, Jun 25, 2018 at 8:12 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On 6/25/18 6:11 AM, Bill Bejeck wrote:
> > > > > All,
> > > > > I'd like to start a vote for this KIP now.
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>