[jira] [Created] (KAFKA-9142) topic __transaction_state disk space grows too big

2019-11-04 Thread panpan.liu (Jira)
panpan.liu created KAFKA-9142:
-

 Summary: topic __transaction_state disk space grows too big
 Key: KAFKA-9142
 URL: https://issues.apache.org/jira/browse/KAFKA-9142
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner
Affects Versions: 2.0.0
Reporter: panpan.liu
 Attachments: image-2019-11-05-15-31-46-859.png, 
image-2019-11-05-15-33-55-632.png

kafka broker :2.0.0

kafka stream client: 2.1.0

 

```

./kafka_2.11-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181  --topic 
__transaction_state  --describe./kafka_2.11-2.0.0/bin/kafka-topics.sh 
--zookeeper localhost:2181  --topic __transaction_state  
--describeTopic:__transaction_state PartitionCount:50 ReplicationFactor:2 
Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=1
 Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 
Topic: __transaction_state Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: 
__transaction_state Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: 
__transaction_state Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: 
__transaction_state Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: 
__transaction_state Partition: 5 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: 
__transaction_state Partition: 6 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: 
__transaction_state Partition: 7 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: 
__transaction_state Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: 
__transaction_state Partition: 9 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: 
__transaction_state Partition: 10 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: 
__transaction_state Partition: 11 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: 
__transaction_state Partition: 12 Leader: 1 Replicas: 1,2 Isr: 1,2

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9141) Global state update error: missing recovery or wrong log message

2019-11-04 Thread Chris Toomey (Jira)
Chris Toomey created KAFKA-9141:
---

 Summary: Global state update error: missing recovery or wrong log 
message
 Key: KAFKA-9141
 URL: https://issues.apache.org/jira/browse/KAFKA-9141
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Chris Toomey


I'm getting an {{OffsetOutOfRangeException}} accompanied by the log message 
"Updating global state failed. You can restart KafkaStreams to recover from 
this error." But I've restarted the app several times and it's not recovering, 
it keeps failing the same way.
 
I see there's a {{cleanUp()}} method on {{KafkaStreams}} that looks like it's 
what's needed, but it's not called anywhere in the streams source code. So 
either that's a bug and the call should be added to do the recovery, or the log 
message is wrong and should be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-2.2-jdk8-old #184

2019-11-04 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: fix bug in VP test where it greps for the wrong log message


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H40 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.2^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.2^{commit} # timeout=10
Checking out Revision faa46545045b1254d83561c072f7eb6d220f8e99 
(refs/remotes/origin/2.2)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f faa46545045b1254d83561c072f7eb6d220f8e99
Commit message: "HOTFIX: fix bug in VP test where it greps for the wrong log 
message (#7643)"
 > git rev-list --no-walk 75d1c0b131d567d24dfae0b9dcf08872377985ce # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.2-jdk8-old] $ /bin/bash -xe /tmp/jenkins2484416287619320099.sh
+ rm -rf 
+ /bin/gradle
/tmp/jenkins2484416287619320099.sh: line 4: /bin/gradle: No such file or 
directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=faa46545045b1254d83561c072f7eb6d220f8e99, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #175
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user b...@confluent.io
Not sending mail to unregistered user wangg...@gmail.com


Build failed in Jenkins: kafka-2.4-jdk8 #51

2019-11-04 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: fix bug in VP test where it greps for the wrong log message


--
[...truncated 2.70 MB...]
org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
STARTED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName PASSED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics STARTED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver PASSED

org.apache.kafka.streams.TestTopicsTest > testValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testValueList PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordList PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task 

Build failed in Jenkins: kafka-2.1-jdk8 #239

2019-11-04 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: fix bug in VP test where it greps for the wrong log message


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H36 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.1^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.1^{commit} # timeout=10
Checking out Revision 59e25cf48e182461235cc60c1d647b94fa011659 
(refs/remotes/origin/2.1)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 59e25cf48e182461235cc60c1d647b94fa011659
Commit message: "HOTFIX: fix bug in VP test where it greps for the wrong log 
message (#7643)"
 > git rev-list --no-walk 16e3c52f1d8b12106add2db87c6c622bbc932ac0 # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.1-jdk8] $ /bin/bash -xe /tmp/jenkins1720094430323659038.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.8.1/bin/gradle
/tmp/jenkins1720094430323659038.sh: line 4: 
/home/jenkins/tools/gradle/4.8.1/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=59e25cf48e182461235cc60c1d647b94fa011659, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #230
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user b...@confluent.io
Not sending mail to unregistered user wangg...@gmail.com


[jira] [Created] (KAFKA-9140) Consumer gets stuck rejoining the group indefinitely

2019-11-04 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9140:
--

 Summary: Consumer gets stuck rejoining the group indefinitely
 Key: KAFKA-9140
 URL: https://issues.apache.org/jira/browse/KAFKA-9140
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 2.4.0
Reporter: Sophie Blee-Goldman


There seems to be a race condition that is now causing a rejoining member to 
potentially get stuck infinitely initiating a rejoin. The relevant logs are 
attached, but basically it repeats this message (and nothing else) continuously 
until killed/shutdown:

 
{code:java}
[2019-11-05 01:53:54,699] INFO [Consumer 
clientId=StreamsUpgradeTest-a4c1cff8-7883-49cd-82da-d2cdfc33a2f0-StreamThread-1-consumer,
 groupId=StreamsUpgradeTest] Generation data was cleared by heartbeat thread. 
Initiating rejoin. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
{code}
 

The message that appears was added as part of the bugfix (PR #7460) for this 
related race condition: KAFKA-8104.

This issue was uncovered by the Streams version probing upgrade test, which 
fails with a varying frequency. Here is the rate of failures for different 
system test runs so far:

trunk (cooperative): 1/1 and 2/10 failures

2.4 (cooperative) : 0/10 and 1/15 failures

trunk (eager): 0/10 failures

I've kicked off some high-repeat runs to complete overnight and hopefully shed 
more light.

Note that I have also kicked off runs of both 2.4 and trunk with the PR for 
KAFKA-8104 reverted. Both of them saw 2/10 failures, due to hitting the bug 
that was fixed by PR #7460. It is therefore unclear whether PR #7460 introduced 
another or a new race condition/bug, or merely uncovered an existing one that 
previously would have first failed due to KAFKA-8104.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
>>  I'm having some trouble wrapping my head around what race conditions
might occur, other than the fundamentally broken state in which different
instances are running totally different topologies.
3. @both Without the topic partitions that the tasks can map back to, we
have to rely on topology/cluster metadata in each Streams instance to map
the task back. If the source topics are wild carded for e,g then each
instance could have different source topics in topology, until the next
rebalance happens. You can also read my comments from here
https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106


>> seems hard to imagine how encoding arbitrarily long topic names plus an
integer for the partition number could be as efficient as task ids, which
are just two integers.
3. if you still have concerns about the efficacy of dictionary encoding,
happy to engage. The link above also has some benchmark code I used.
Theoretically, we would send each topic name atleast once, so yes if you
compare a 10-20 character topic name + an integer to two integers, it will
be more bytes. But its constant overhead proportional to size of topic name
and with 4,8,12, partitions the size difference between baseline (version 4
where we just repeated topic names for each topic partition) and the two
approaches becomes narrow.

>>Plus, Navinder is going to implement a bunch of protocol code that we
might just want to change when the discussion actually does take place, if
ever.
>>it'll just be a mental burden for everyone to remember that we want to
have this follow-up discussion.
3. Is n't people changing same parts of code and tracking follow ups a
common thing, we need to deal with anyway?  For this KIP, is n't it enough
to reason about whether the additional map on top of the topic dictionary
would incur more overhead than the sending task_ids? I don't think it's
case, both of them send two integers. As I see it, we can do a separate
follow up to (re)pursue the task_id conversion and get it working for both
maps within the next release?

>>Can you elaborate on "breaking up the API"? It looks like there are
already separate API calls in the proposal, one for time-lag, and another
for offset-lag, so are they not already broken up?
The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo objects
which has both time and offset lags. If we had separate APIs, say (e.g
offsetLagForStore(), timeLagForStore()), we can implement offset version
using the offset lag that the streams instance already tracks i.e no need
for external calls. The time based lag API would incur the kafka read for
the timestamp. makes sense?

Based on the discussions so far, I only see these two pending issues to be
aligned on. Is there any other open item people want to bring up?

On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
wrote:

> Regarding 3) I'm wondering, does your concern still apply even now
> that the pluggable PartitionGrouper interface has been deprecated?
> Now that we can be sure that the DefaultPartitionGrouper is used to
> generate
> the taskId -> partitions mapping, we should be able to convert any taskId
> to any
> partitions.
>
> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>
> > Hey Vinoth, thanks for the reply!
> >
> > 3.
> > I get that it's not the main focus of this KIP, but if it's ok, it
> > would be nice to hash out this point right now. It only came up
> > because this KIP-535 is substantially extending the pattern in
> > question. If we push it off until later, then the reviewers are going
> > to have to suspend their concerns not just while voting for the KIP,
> > but also while reviewing the code. Plus, Navinder is going to
> > implement a bunch of protocol code that we might just want to change
> > when the discussion actually does take place, if ever. Finally, it'll
> > just be a mental burden for everyone to remember that we want to have
> > this follow-up discussion.
> >
> > It makes sense what you say... the specific assignment is already
> > encoded in the "main" portion of the assignment, not in the "userdata"
> > part. It also makes sense that it's simpler to reason about races if
> > you simply get all the information about the topics and partitions
> > directly from the assignor, rather than get the partition number from
> > the assignor and the topic name from your own a priori knowledge of
> > the topology. On the other hand, I'm having some trouble wrapping my
> > head around what race conditions might occur, other than the
> > fundamentally broken state in which different instances are running
> > totally different topologies. Sorry, but can you remind us of the
> > specific condition?
> >
> > To the efficiency counterargument, it seems hard to imagine how
> > encoding arbitrarily long topic names plus an integer for the
> > partition number could be as efficient as task ids, which are just two

[jira] [Created] (KAFKA-9139) Dynamic broker config types aren't being discovered

2019-11-04 Thread Brian Byrne (Jira)
Brian Byrne created KAFKA-9139:
--

 Summary: Dynamic broker config types aren't being discovered
 Key: KAFKA-9139
 URL: https://issues.apache.org/jira/browse/KAFKA-9139
 Project: Kafka
  Issue Type: Bug
Reporter: Brian Byrne
Assignee: Brian Byrne


The broker's dynamic config definition types aren't being properly discovered, 
and therefore they're being considered "sensitive" when returned to the client. 
This needs to be resolved. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-04 Thread Jun Rao
Hi, Satish,

Thanks for the response.

21. Could you elaborate a bit why the positions in remote segment is
different from the local one? I thought that they are identical copies.

Jun


On Fri, Nov 1, 2019 at 4:26 AM Satish Duggana 
wrote:

> Hi Jun,
> Thanks for looking into the updated KIP and clarifying our earlier queries.
>
> >20. It's fine to keep the HDFS binding temporarily in the PR. We just need
> to remove it before it's merged to trunk. As Victor mentioned, we can
> provide a reference implementation based on a mocked version of remote
> storage.
>
> Sure, sounds good.
>
> >21. I am not sure that I understood the need for RemoteLogIndexEntry and
> its relationship with RemoteLogSegmentInfo. It seems
> that RemoteLogIndexEntry are offset index entries pointing to record
> batches inside a segment. That seems to be the same as the .index file?
>
> That is a good point. `RemoteLogManager` does not put a restriction on
> `RemoteStorageManager(RSM)` for maintaining positions in the remote
> segment same as the local segments or keeping a correlation between
> local segment's positions to the remote segment positions. RSM gives
> back the respective entries for a given log segment, call RSM to fetch
> the data by giving the respective entry. This allows RSM to have
> better control in managing the given log segments.
>
> Thanks,
> Satish.
>
> On Fri, Nov 1, 2019 at 2:28 AM Jun Rao  wrote:
> >
> > Hi, Harsha,
> >
> > I am still looking at the KIP and the PR. A couple of quick
> > comments/questions.
> >
> > 20. It's fine to keep the HDFS binding temporarily in the PR. We just
> need
> > to remove it before it's merged to trunk. As Victor mentioned, we can
> > provide a reference implementation based on a mocked version of remote
> > storage.
> >
> > 21. I am not sure that I understood the need for RemoteLogIndexEntry and
> > its relationship with RemoteLogSegmentInfo. It seems
> > that RemoteLogIndexEntry are offset index entries pointing to record
> > batches inside a segment. That seems to be the same as the .index file?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Oct 28, 2019 at 9:11 PM Satish Duggana  >
> > wrote:
> >
> > > Hi Viktor,
> > > >1. Can we allow RLM Followers to serve read requests? After all
> segments
> > > on
> > > the cold storage are closed ones, no modification is allowed. Besides
> > > KIP-392 (
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > )
> > > would introduce follower fetching too, so I think it would be nice to
> > > prepare RLM for this as well.
> > >
> > > That is a good point. We plan to support fetching remote storage from
> > > followers too. Current code in the PR work fine for this scenario
> > > though there may be some edge cases to be handled. We have not yet
> > > tested this scenario.
> > >
> > > >2. I think the remote.log.storage.enable config is redundant. By
> > > specifying
> > > remote.log.storage.manager.class.name one already declares that they
> want
> > > to use remote storage. Would it make sense to remove
> > > the remote.log.storage.enable config?
> > >
> > > I do not think it is really needed. `remote.log.storage.enable`
> > > property can be removed.
> > >
> > > Thanks,
> > > Satish.
> > >
> > >
> > > On Thu, Oct 24, 2019 at 2:46 PM Viktor Somogyi-Vass
> > >  wrote:
> > > >
> > > > Hi Harsha,
> > > >
> > > > A couple more questions:
> > > > 1. Can we allow RLM Followers to serve read requests? After all
> segments
> > > on
> > > > the cold storage are closed ones, no modification is allowed. Besides
> > > > KIP-392 (
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > )
> > > > would introduce follower fetching too, so I think it would be nice to
> > > > prepare RLM for this as well.
> > > > 2. I think the remote.log.storage.enable config is redundant. By
> > > specifying
> > > > remote.log.storage.manager.class.name one already declares that they
> > > want
> > > > to use remote storage. Would it make sense to remove
> > > > the remote.log.storage.enable config?
> > > >
> > > > Thanks,
> > > > Viktor
> > > >
> > > >
> > > > On Thu, Oct 24, 2019 at 10:37 AM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com> wrote:
> > > >
> > > > > Hi Jun & Harsha,
> > > > >
> > > > > I think it would be beneficial to at least provide one simple
> reference
> > > > > implementation (file system based?) as we do with connect too.
> > > > > That would as a simple example and would help plugin developers to
> > > better
> > > > > understand the concept and the interfaces.
> > > > >
> > > > > Best,
> > > > > Viktor
> > > > >
> > > > > On Wed, Oct 23, 2019 at 8:49 PM Jun Rao  wrote:
> > > > >
> > > > >> Hi, Harsha,
> > > > >>
> > > > >> Regarding feature branch, if the goal is faster collaboration, it
> > > seems
> > > > >> that doing the development on your own fork is better since
> > > non-committers
> > 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-04 Thread Harsha Chintalapani
Hi Jun,
  Can you please take a look at Satish's reply. Let us know if that
answers your question.
I would like to get yours and the rest of the community thoughts on the
general direction we are going as we continue
to make progress.

Thanks,
Harsha

On Fri, Nov 1, 2019 at 3:06 AM Satish Duggana 
wrote:

> Hi Jun,
> Thanks for looking into the updated KIP and clarifying our earlier queries.
>
> >20. It's fine to keep the HDFS binding temporarily in the PR. We just need
> to remove it before it's merged to trunk. As Victor mentioned, we can
> provide a reference implementation based on a mocked version of remote
> storage.
>
> Sure, sounds good.
>
> >21. I am not sure that I understood the need for RemoteLogIndexEntry and
> its relationship with RemoteLogSegmentInfo. It seems
> that RemoteLogIndexEntry are offset index entries pointing to record
> batches inside a segment. That seems to be the same as the .index file?
>
> That is a good point. `RemoteLogManager` does not put a restriction on
> `RemoteStorageManager(RSM)` for maintaining positions in the remote
> segment same as the local segments or keeping a correlation between
> local segment's positions to the remote segment positions. RSM gives
> back the respective entries for a given log segment, call RSM to fetch
> the data by giving the respective entry. This allows RSM to have
> better control in managing the given log segments.
>
> Thanks,
> Satish.
>
> On Fri, Nov 1, 2019 at 2:28 AM Jun Rao  wrote:
> >
> > Hi, Harsha,
> >
> > I am still looking at the KIP and the PR. A couple of quick
> > comments/questions.
> >
> > 20. It's fine to keep the HDFS binding temporarily in the PR. We just
> need
> > to remove it before it's merged to trunk. As Victor mentioned, we can
> > provide a reference implementation based on a mocked version of remote
> > storage.
> >
> > 21. I am not sure that I understood the need for RemoteLogIndexEntry and
> > its relationship with RemoteLogSegmentInfo. It seems
> > that RemoteLogIndexEntry are offset index entries pointing to record
> > batches inside a segment. That seems to be the same as the .index file?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Oct 28, 2019 at 9:11 PM Satish Duggana  >
> > wrote:
> >
> > > Hi Viktor,
> > > >1. Can we allow RLM Followers to serve read requests? After all
> segments
> > > on
> > > the cold storage are closed ones, no modification is allowed. Besides
> > > KIP-392 (
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > )
> > > would introduce follower fetching too, so I think it would be nice to
> > > prepare RLM for this as well.
> > >
> > > That is a good point. We plan to support fetching remote storage from
> > > followers too. Current code in the PR work fine for this scenario
> > > though there may be some edge cases to be handled. We have not yet
> > > tested this scenario.
> > >
> > > >2. I think the remote.log.storage.enable config is redundant. By
> > > specifying
> > > remote.log.storage.manager.class.name one already declares that they
> want
> > > to use remote storage. Would it make sense to remove
> > > the remote.log.storage.enable config?
> > >
> > > I do not think it is really needed. `remote.log.storage.enable`
> > > property can be removed.
> > >
> > > Thanks,
> > > Satish.
> > >
> > >
> > > On Thu, Oct 24, 2019 at 2:46 PM Viktor Somogyi-Vass
> > >  wrote:
> > > >
> > > > Hi Harsha,
> > > >
> > > > A couple more questions:
> > > > 1. Can we allow RLM Followers to serve read requests? After all
> segments
> > > on
> > > > the cold storage are closed ones, no modification is allowed. Besides
> > > > KIP-392 (
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > )
> > > > would introduce follower fetching too, so I think it would be nice to
> > > > prepare RLM for this as well.
> > > > 2. I think the remote.log.storage.enable config is redundant. By
> > > specifying
> > > > remote.log.storage.manager.class.name one already declares that they
> > > want
> > > > to use remote storage. Would it make sense to remove
> > > > the remote.log.storage.enable config?
> > > >
> > > > Thanks,
> > > > Viktor
> > > >
> > > >
> > > > On Thu, Oct 24, 2019 at 10:37 AM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com> wrote:
> > > >
> > > > > Hi Jun & Harsha,
> > > > >
> > > > > I think it would be beneficial to at least provide one simple
> reference
> > > > > implementation (file system based?) as we do with connect too.
> > > > > That would as a simple example and would help plugin developers to
> > > better
> > > > > understand the concept and the interfaces.
> > > > >
> > > > > Best,
> > > > > Viktor
> > > > >
> > > > > On Wed, Oct 23, 2019 at 8:49 PM Jun Rao  wrote:
> > > > >
> > > > >> Hi, Harsha,
> > > > >>
> > > > >> Regarding feature branch, if the goal is faster collaboration, it
> > > seems
> > > > >> that doing 

[jira] [Created] (KAFKA-9138) Add system test covering Foreign Key joins (KIP-213)

2019-11-04 Thread John Roesler (Jira)
John Roesler created KAFKA-9138:
---

 Summary: Add system test covering Foreign Key joins (KIP-213)
 Key: KAFKA-9138
 URL: https://issues.apache.org/jira/browse/KAFKA-9138
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


There are unit and integration tests, but we should really have a system test 
as well.

I plan to create a new test, since this feature is pretty different than the 
existing topology/data set of smoke test. Although, it might be possible for 
the new test to subsume smoke test. I'd give the new test a few releases to 
burn in before considering a merge, though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-04 Thread Guozhang Wang
Eric,

I think that's a good point, in `Headers.java` we also designed the API to
only have `Header lastHeader(String key);`. I think picking the last header
for that key would makes more sense since internally it is organized as a
list such that newer headers could consider "overwriting" the older headers.


Guozhang

On Mon, Nov 4, 2019 at 11:31 AM Eric Azama  wrote:

> Hi Senthilnathan,
>
> Regarding Matthias's point 6, what is the reasoning for choosing the first
> occurrence of the configured header? I believe this corresponds to the
> oldest value for given key. If there are multiple values for a key, it
> seems more intuitive that the newest value is the one that should be used
> for compaction.
>
> Thanks,
> Eric
>
> On Mon, Nov 4, 2019 at 11:00 AM Guozhang Wang  wrote:
>
> > Hello Senthilnathan,
> >
> > Thanks for revamping on the KIP. I have only one comment about the wiki
> > otherwise LGTM.
> >
> > 1. We should emphasize that the newly introduced config yields to the
> > existing "log.cleanup.policy", i.e. if the latter's value is `delete` not
> > `compact`, then the previous config would be ignored.
> >
> >
> > Guozhang
> >
> > On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy
> >  wrote:
> >
> > > Hi all,
> > >
> > > I will start the vote thread shortly for this updated KIP. If there are
> > > any more thoughts I would love to hear them.
> > >
> > > Thanks,
> > > Senthil
> > >
> > > -Original Message-
> > > From: Senthilnathan Muthusamy 
> > > Sent: Thursday, October 31, 2019 3:51 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> > >
> > > Hi Matthias
> > >
> > > Thanks for the response.
> > >
> > > (1) Yes
> > >
> > > (2) Yes, and the config name will be the same (i.e.
> > > `log.cleaner.compaction.strategy` &
> > > `log.cleaner.compaction.strategy.header`) at broker level and topic
> level
> > > (to override broker level default compact strategy). Please let me know
> > if
> > > we need to keep it in different naming convention. Note: Broker level
> > > (which will be in the server.properties) configuration is optional and
> > > default it to offset. Topic level configuration will be default to
> broker
> > > level config...
> > >
> > > (3) By this new way, it avoids another config parameter and also in
> > > feature if any new strategy like header need addition info, no
> additional
> > > config required. As this got discussed already and agreed to have
> > separate
> > > config, I will revert it. KIP updated...
> > >
> > > (4) Done
> > >
> > > (5) Updated
> > >
> > > (6) Updated to pick the first header in the list
> > >
> > > Please let me know if you have any other questions.
> > >
> > > Thanks,
> > > Senthil
> > >
> > > -Original Message-
> > > From: Matthias J. Sax 
> > > Sent: Thursday, October 31, 2019 12:13 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> > >
> > > Thanks for picking up this KIP, Senthil.
> > >
> > > (1) As far as I remember, the main issue of the original proposal was a
> > > missing topic level configuration for the compaction strategy. With
> this
> > > being addressed, I am in favor of this KIP.
> > >
> > > (2) With regard to (1), it seems we would need a new topic level config
> > > `compaction.strategy`, and `log.cleaner.compaction.strategy` would be
> the
> > > default strategy (ie, broker level config) if a topic does not
> overwrite
> > it?
> > >
> > > (3) Why did you remove `log.cleaner.compaction.strategy.header`
> > > parameter and change the accepted values of
> > > `log.cleaner.compaction.strategy` to "header." instead of keeping
> > > "header"? The original approach seems to be cleaner, and I think this
> was
> > > discussed on the original discuss thread already.
> > >
> > > (4) Nit: For the "timestamp" compaction strategy you changed the KIP to
> > >
> > > -> `The record [create] timestamp`
> > >
> > > This is miss leading IMHO, because it depends on the broker/log
> > > configuration `(log.)message.timestamp.type` that can either be
> > > `CreateTime` or `LogAppendTime` what the actual record timestamp is. I
> > > would just remove "create" to keep it unspecified.
> > >
> > > (5) Nit: the section "Public Interfaces" should list the newly
> introduced
> > > configs -- configuration parameters are a public interface.
> > >
> > > (6) What do you mean by "first level header lookup"? The term "first
> > > level" indicates some hierarchy, but headers don't have any hierarchy
> --
> > > it's just a list of key-value pairs? If you mean the _order_ of the
> > > headers, ie, pick the first header in the list that matches the key,
> > please
> > > rephrase it to make it clearer.
> > >
> > >
> > >
> > > @Tom: I agree with all you are saying, however, I still think that this
> > > KIP will improve the overall situation, because everything you pointed
> > out
> > > is actually true with offset based compaction, too.
> > >
> > > The KIP is not a 

Re: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-04 Thread Eric Azama
Hi Senthilnathan,

Regarding Matthias's point 6, what is the reasoning for choosing the first
occurrence of the configured header? I believe this corresponds to the
oldest value for given key. If there are multiple values for a key, it
seems more intuitive that the newest value is the one that should be used
for compaction.

Thanks,
Eric

On Mon, Nov 4, 2019 at 11:00 AM Guozhang Wang  wrote:

> Hello Senthilnathan,
>
> Thanks for revamping on the KIP. I have only one comment about the wiki
> otherwise LGTM.
>
> 1. We should emphasize that the newly introduced config yields to the
> existing "log.cleanup.policy", i.e. if the latter's value is `delete` not
> `compact`, then the previous config would be ignored.
>
>
> Guozhang
>
> On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy
>  wrote:
>
> > Hi all,
> >
> > I will start the vote thread shortly for this updated KIP. If there are
> > any more thoughts I would love to hear them.
> >
> > Thanks,
> > Senthil
> >
> > -Original Message-
> > From: Senthilnathan Muthusamy 
> > Sent: Thursday, October 31, 2019 3:51 AM
> > To: dev@kafka.apache.org
> > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hi Matthias
> >
> > Thanks for the response.
> >
> > (1) Yes
> >
> > (2) Yes, and the config name will be the same (i.e.
> > `log.cleaner.compaction.strategy` &
> > `log.cleaner.compaction.strategy.header`) at broker level and topic level
> > (to override broker level default compact strategy). Please let me know
> if
> > we need to keep it in different naming convention. Note: Broker level
> > (which will be in the server.properties) configuration is optional and
> > default it to offset. Topic level configuration will be default to broker
> > level config...
> >
> > (3) By this new way, it avoids another config parameter and also in
> > feature if any new strategy like header need addition info, no additional
> > config required. As this got discussed already and agreed to have
> separate
> > config, I will revert it. KIP updated...
> >
> > (4) Done
> >
> > (5) Updated
> >
> > (6) Updated to pick the first header in the list
> >
> > Please let me know if you have any other questions.
> >
> > Thanks,
> > Senthil
> >
> > -Original Message-
> > From: Matthias J. Sax 
> > Sent: Thursday, October 31, 2019 12:13 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Thanks for picking up this KIP, Senthil.
> >
> > (1) As far as I remember, the main issue of the original proposal was a
> > missing topic level configuration for the compaction strategy. With this
> > being addressed, I am in favor of this KIP.
> >
> > (2) With regard to (1), it seems we would need a new topic level config
> > `compaction.strategy`, and `log.cleaner.compaction.strategy` would be the
> > default strategy (ie, broker level config) if a topic does not overwrite
> it?
> >
> > (3) Why did you remove `log.cleaner.compaction.strategy.header`
> > parameter and change the accepted values of
> > `log.cleaner.compaction.strategy` to "header." instead of keeping
> > "header"? The original approach seems to be cleaner, and I think this was
> > discussed on the original discuss thread already.
> >
> > (4) Nit: For the "timestamp" compaction strategy you changed the KIP to
> >
> > -> `The record [create] timestamp`
> >
> > This is miss leading IMHO, because it depends on the broker/log
> > configuration `(log.)message.timestamp.type` that can either be
> > `CreateTime` or `LogAppendTime` what the actual record timestamp is. I
> > would just remove "create" to keep it unspecified.
> >
> > (5) Nit: the section "Public Interfaces" should list the newly introduced
> > configs -- configuration parameters are a public interface.
> >
> > (6) What do you mean by "first level header lookup"? The term "first
> > level" indicates some hierarchy, but headers don't have any hierarchy --
> > it's just a list of key-value pairs? If you mean the _order_ of the
> > headers, ie, pick the first header in the list that matches the key,
> please
> > rephrase it to make it clearer.
> >
> >
> >
> > @Tom: I agree with all you are saying, however, I still think that this
> > KIP will improve the overall situation, because everything you pointed
> out
> > is actually true with offset based compaction, too.
> >
> > The KIP is not a silver bullet that solves all issue for interleaved
> > writes, but I personally believe, it's a good improvement.
> >
> >
> >
> > -Matthias
> >
> >
> > On 10/30/19 9:45 AM, Senthilnathan Muthusamy wrote:
> > > Hi,
> > >
> > > Please let me know if anyone has any questions on this updated
> KIP-280...
> > >
> > > Thanks,
> > >
> > > Senthil
> > >
> > > -Original Message-
> > > From: Senthilnathan Muthusamy 
> > > Sent: Monday, October 28, 2019 11:36 PM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> > >
> > > Hi Tom,
> > >
> > > Sorry for the delayed response.
> > >
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Sophie Blee-Goldman
Regarding 3) I'm wondering, does your concern still apply even now
that the pluggable PartitionGrouper interface has been deprecated?
Now that we can be sure that the DefaultPartitionGrouper is used to generate
the taskId -> partitions mapping, we should be able to convert any taskId
to any
partitions.

On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:

> Hey Vinoth, thanks for the reply!
>
> 3.
> I get that it's not the main focus of this KIP, but if it's ok, it
> would be nice to hash out this point right now. It only came up
> because this KIP-535 is substantially extending the pattern in
> question. If we push it off until later, then the reviewers are going
> to have to suspend their concerns not just while voting for the KIP,
> but also while reviewing the code. Plus, Navinder is going to
> implement a bunch of protocol code that we might just want to change
> when the discussion actually does take place, if ever. Finally, it'll
> just be a mental burden for everyone to remember that we want to have
> this follow-up discussion.
>
> It makes sense what you say... the specific assignment is already
> encoded in the "main" portion of the assignment, not in the "userdata"
> part. It also makes sense that it's simpler to reason about races if
> you simply get all the information about the topics and partitions
> directly from the assignor, rather than get the partition number from
> the assignor and the topic name from your own a priori knowledge of
> the topology. On the other hand, I'm having some trouble wrapping my
> head around what race conditions might occur, other than the
> fundamentally broken state in which different instances are running
> totally different topologies. Sorry, but can you remind us of the
> specific condition?
>
> To the efficiency counterargument, it seems hard to imagine how
> encoding arbitrarily long topic names plus an integer for the
> partition number could be as efficient as task ids, which are just two
> integers. It seems like this would only be true if topic names were 4
> characters or less.
>
> 4.
> Yeah, clearly, it would not be a good idea to query the metadata
> before every single IQ query. I think there are plenty of established
> patterns for distributed database clients to follow. Can you elaborate
> on "breaking up the API"? It looks like there are already separate API
> calls in the proposal, one for time-lag, and another for offset-lag,
> so are they not already broken up? FWIW, yes, I agree, the offset lag
> is already locally known, so we don't need to build in an extra
> synchronous broker API call, just one for the time-lag.
>
> Thanks again for the discussion,
> -John
>
> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar 
> wrote:
> >
> > 3. Right now, we still get the topic partitions assigned as a part of the
> > top level Assignment object (the one that wraps AssignmentInfo) and use
> > that to convert taskIds back. This list of only contains assignments for
> > that particular instance. Attempting to also reverse map for "all" the
> > tasksIds in the streams cluster i.e all the topic partitions in these
> > global assignment maps was what was problematic. By explicitly sending
> the
> > global assignment maps as actual topic partitions,  group coordinator
> (i.e
> > the leader that computes the assignment's ) is able to consistently
> enforce
> > its view of the topic metadata. Still don't think doing such a change
> that
> > forces you to reconsider semantics, is not needed to save bits on wire.
> May
> > be we can discuss this separately from this KIP?
> >
> > 4. There needs to be some caching/interval somewhere though since we
> don't
> > want to make 1 kafka read per 1 IQ potentially. But I think its a valid
> > suggestion, to make this call just synchronous and leave the caching or
> how
> > often you want to call to the application. Would it be good to then break
> > up the APIs for time and offset based lag?  We can obtain offset based
> lag
> > for free? Only incur the overhead of reading kafka if we want time
> > based lags?
> >
> > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
> > wrote:
> >
> > > Adding on to John's response to 3), can you clarify when and why
> exactly we
> > > cannot
> > > convert between taskIds and partitions? If that's really the case I
> don't
> > > feel confident
> > > that the StreamsPartitionAssignor is not full of bugs...
> > >
> > > It seems like it currently just encodes a list of all partitions (the
> > > assignment) and also
> > > a list of the corresponding task ids, duplicated to ensure each
> partition
> > > has the corresponding
> > > taskId at the same offset into the list. Why is that problematic?
> > >
> > >
> > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler 
> wrote:
> > >
> > > > Thanks, all, for considering the points!
> > > >
> > > > 3. Interesting. I have a vague recollection of that... Still, though,
> > > > it seems a little fishy. After all, we return the assignments
> > > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread John Roesler
Hey Vinoth, thanks for the reply!

3.
I get that it's not the main focus of this KIP, but if it's ok, it
would be nice to hash out this point right now. It only came up
because this KIP-535 is substantially extending the pattern in
question. If we push it off until later, then the reviewers are going
to have to suspend their concerns not just while voting for the KIP,
but also while reviewing the code. Plus, Navinder is going to
implement a bunch of protocol code that we might just want to change
when the discussion actually does take place, if ever. Finally, it'll
just be a mental burden for everyone to remember that we want to have
this follow-up discussion.

It makes sense what you say... the specific assignment is already
encoded in the "main" portion of the assignment, not in the "userdata"
part. It also makes sense that it's simpler to reason about races if
you simply get all the information about the topics and partitions
directly from the assignor, rather than get the partition number from
the assignor and the topic name from your own a priori knowledge of
the topology. On the other hand, I'm having some trouble wrapping my
head around what race conditions might occur, other than the
fundamentally broken state in which different instances are running
totally different topologies. Sorry, but can you remind us of the
specific condition?

To the efficiency counterargument, it seems hard to imagine how
encoding arbitrarily long topic names plus an integer for the
partition number could be as efficient as task ids, which are just two
integers. It seems like this would only be true if topic names were 4
characters or less.

4.
Yeah, clearly, it would not be a good idea to query the metadata
before every single IQ query. I think there are plenty of established
patterns for distributed database clients to follow. Can you elaborate
on "breaking up the API"? It looks like there are already separate API
calls in the proposal, one for time-lag, and another for offset-lag,
so are they not already broken up? FWIW, yes, I agree, the offset lag
is already locally known, so we don't need to build in an extra
synchronous broker API call, just one for the time-lag.

Thanks again for the discussion,
-John

On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar  wrote:
>
> 3. Right now, we still get the topic partitions assigned as a part of the
> top level Assignment object (the one that wraps AssignmentInfo) and use
> that to convert taskIds back. This list of only contains assignments for
> that particular instance. Attempting to also reverse map for "all" the
> tasksIds in the streams cluster i.e all the topic partitions in these
> global assignment maps was what was problematic. By explicitly sending the
> global assignment maps as actual topic partitions,  group coordinator (i.e
> the leader that computes the assignment's ) is able to consistently enforce
> its view of the topic metadata. Still don't think doing such a change that
> forces you to reconsider semantics, is not needed to save bits on wire. May
> be we can discuss this separately from this KIP?
>
> 4. There needs to be some caching/interval somewhere though since we don't
> want to make 1 kafka read per 1 IQ potentially. But I think its a valid
> suggestion, to make this call just synchronous and leave the caching or how
> often you want to call to the application. Would it be good to then break
> up the APIs for time and offset based lag?  We can obtain offset based lag
> for free? Only incur the overhead of reading kafka if we want time
> based lags?
>
> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
> wrote:
>
> > Adding on to John's response to 3), can you clarify when and why exactly we
> > cannot
> > convert between taskIds and partitions? If that's really the case I don't
> > feel confident
> > that the StreamsPartitionAssignor is not full of bugs...
> >
> > It seems like it currently just encodes a list of all partitions (the
> > assignment) and also
> > a list of the corresponding task ids, duplicated to ensure each partition
> > has the corresponding
> > taskId at the same offset into the list. Why is that problematic?
> >
> >
> > On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:
> >
> > > Thanks, all, for considering the points!
> > >
> > > 3. Interesting. I have a vague recollection of that... Still, though,
> > > it seems a little fishy. After all, we return the assignments
> > > themselves as task ids, and the members have to map these to topic
> > > partitions in order to configure themselves properly. If it's too
> > > complicated to get this right, then how do we know that Streams is
> > > computing the correct partitions at all?
> > >
> > > 4. How about just checking the log-end timestamp when you call the
> > > method? Then, when you get an answer, it's as fresh as it could
> > > possibly be. And as a user you have just one, obvious, "knob" to
> > > configure how much overhead you want to devote to checking... If you
> > > want to 

Re: [VOTE] KIP-280: Enhanced log compaction

2019-11-04 Thread Guozhang Wang
I only have one minor comment on the DISCUSS thread, otherwise I'm +1
(binding).

On Mon, Nov 4, 2019 at 9:53 AM Senthilnathan Muthusamy
 wrote:

> Hi all,
>
> I would like to start the vote on the updated KIP-280: Enhanced log
> compaction. Thanks to Guozhang, Matthias & Tom for the valuable feedback on
> the discussion thread...
>
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction
>
> JIRA: https://issues.apache.org/jira/browse/KAFKA-7061
>
> Thanks,
> Senthil
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-04 Thread Guozhang Wang
Hello Senthilnathan,

Thanks for revamping on the KIP. I have only one comment about the wiki
otherwise LGTM.

1. We should emphasize that the newly introduced config yields to the
existing "log.cleanup.policy", i.e. if the latter's value is `delete` not
`compact`, then the previous config would be ignored.


Guozhang

On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy
 wrote:

> Hi all,
>
> I will start the vote thread shortly for this updated KIP. If there are
> any more thoughts I would love to hear them.
>
> Thanks,
> Senthil
>
> -Original Message-
> From: Senthilnathan Muthusamy 
> Sent: Thursday, October 31, 2019 3:51 AM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hi Matthias
>
> Thanks for the response.
>
> (1) Yes
>
> (2) Yes, and the config name will be the same (i.e.
> `log.cleaner.compaction.strategy` &
> `log.cleaner.compaction.strategy.header`) at broker level and topic level
> (to override broker level default compact strategy). Please let me know if
> we need to keep it in different naming convention. Note: Broker level
> (which will be in the server.properties) configuration is optional and
> default it to offset. Topic level configuration will be default to broker
> level config...
>
> (3) By this new way, it avoids another config parameter and also in
> feature if any new strategy like header need addition info, no additional
> config required. As this got discussed already and agreed to have separate
> config, I will revert it. KIP updated...
>
> (4) Done
>
> (5) Updated
>
> (6) Updated to pick the first header in the list
>
> Please let me know if you have any other questions.
>
> Thanks,
> Senthil
>
> -Original Message-
> From: Matthias J. Sax 
> Sent: Thursday, October 31, 2019 12:13 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
>
> Thanks for picking up this KIP, Senthil.
>
> (1) As far as I remember, the main issue of the original proposal was a
> missing topic level configuration for the compaction strategy. With this
> being addressed, I am in favor of this KIP.
>
> (2) With regard to (1), it seems we would need a new topic level config
> `compaction.strategy`, and `log.cleaner.compaction.strategy` would be the
> default strategy (ie, broker level config) if a topic does not overwrite it?
>
> (3) Why did you remove `log.cleaner.compaction.strategy.header`
> parameter and change the accepted values of
> `log.cleaner.compaction.strategy` to "header." instead of keeping
> "header"? The original approach seems to be cleaner, and I think this was
> discussed on the original discuss thread already.
>
> (4) Nit: For the "timestamp" compaction strategy you changed the KIP to
>
> -> `The record [create] timestamp`
>
> This is miss leading IMHO, because it depends on the broker/log
> configuration `(log.)message.timestamp.type` that can either be
> `CreateTime` or `LogAppendTime` what the actual record timestamp is. I
> would just remove "create" to keep it unspecified.
>
> (5) Nit: the section "Public Interfaces" should list the newly introduced
> configs -- configuration parameters are a public interface.
>
> (6) What do you mean by "first level header lookup"? The term "first
> level" indicates some hierarchy, but headers don't have any hierarchy --
> it's just a list of key-value pairs? If you mean the _order_ of the
> headers, ie, pick the first header in the list that matches the key, please
> rephrase it to make it clearer.
>
>
>
> @Tom: I agree with all you are saying, however, I still think that this
> KIP will improve the overall situation, because everything you pointed out
> is actually true with offset based compaction, too.
>
> The KIP is not a silver bullet that solves all issue for interleaved
> writes, but I personally believe, it's a good improvement.
>
>
>
> -Matthias
>
>
> On 10/30/19 9:45 AM, Senthilnathan Muthusamy wrote:
> > Hi,
> >
> > Please let me know if anyone has any questions on this updated KIP-280...
> >
> > Thanks,
> >
> > Senthil
> >
> > -Original Message-
> > From: Senthilnathan Muthusamy 
> > Sent: Monday, October 28, 2019 11:36 PM
> > To: dev@kafka.apache.org
> > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hi Tom,
> >
> > Sorry for the delayed response.
> >
> > Regarding the fall back to offset decision for both timestamp & header
> value is based on the previous author discuss
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Flists.apache.org%2Fthread.html%2Ff44317eb6cd34f91966654c80509d4a457dbbccdd02b86645782be67%40%253Cdev.kafka.apache.org%253Edata=02%7C01%7Csenthilm%40microsoft.com%7Cb5c596140be1436e9fb708d75df04714%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637081159484181661sdata=%2Fap4F2CdPQe02wNDGkzjzIrxOQRTa2KraQE75dpjTzE%3Dreserved=0
> and as per the discussion, it is really required to avoid duplicates.
> >
> > And the timestamp strategy is from the original KIP author and we are
> keeping 

[VOTE] KIP-280: Enhanced log compaction

2019-11-04 Thread Senthilnathan Muthusamy
Hi all,

I would like to start the vote on the updated KIP-280: Enhanced log compaction. 
Thanks to Guozhang, Matthias & Tom for the valuable feedback on the discussion 
thread...

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction

JIRA: https://issues.apache.org/jira/browse/KAFKA-7061

Thanks,
Senthil


RE: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-04 Thread Senthilnathan Muthusamy
Hi all,

I will start the vote thread shortly for this updated KIP. If there are any 
more thoughts I would love to hear them.

Thanks,
Senthil

-Original Message-
From: Senthilnathan Muthusamy  
Sent: Thursday, October 31, 2019 3:51 AM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction

Hi Matthias

Thanks for the response.

(1) Yes

(2) Yes, and the config name will be the same (i.e. 
`log.cleaner.compaction.strategy` & `log.cleaner.compaction.strategy.header`) 
at broker level and topic level (to override broker level default compact 
strategy). Please let me know if we need to keep it in different naming 
convention. Note: Broker level (which will be in the server.properties) 
configuration is optional and default it to offset. Topic level configuration 
will be default to broker level config...

(3) By this new way, it avoids another config parameter and also in feature if 
any new strategy like header need addition info, no additional config required. 
As this got discussed already and agreed to have separate config, I will revert 
it. KIP updated...

(4) Done

(5) Updated

(6) Updated to pick the first header in the list

Please let me know if you have any other questions.

Thanks,
Senthil

-Original Message-
From: Matthias J. Sax 
Sent: Thursday, October 31, 2019 12:13 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction

Thanks for picking up this KIP, Senthil.

(1) As far as I remember, the main issue of the original proposal was a missing 
topic level configuration for the compaction strategy. With this being 
addressed, I am in favor of this KIP.

(2) With regard to (1), it seems we would need a new topic level config 
`compaction.strategy`, and `log.cleaner.compaction.strategy` would be the 
default strategy (ie, broker level config) if a topic does not overwrite it?

(3) Why did you remove `log.cleaner.compaction.strategy.header`
parameter and change the accepted values of `log.cleaner.compaction.strategy` 
to "header." instead of keeping "header"? The original approach seems to 
be cleaner, and I think this was discussed on the original discuss thread 
already.

(4) Nit: For the "timestamp" compaction strategy you changed the KIP to

-> `The record [create] timestamp`

This is miss leading IMHO, because it depends on the broker/log configuration 
`(log.)message.timestamp.type` that can either be `CreateTime` or 
`LogAppendTime` what the actual record timestamp is. I would just remove 
"create" to keep it unspecified.

(5) Nit: the section "Public Interfaces" should list the newly introduced 
configs -- configuration parameters are a public interface.

(6) What do you mean by "first level header lookup"? The term "first level" 
indicates some hierarchy, but headers don't have any hierarchy -- it's just a 
list of key-value pairs? If you mean the _order_ of the headers, ie, pick the 
first header in the list that matches the key, please rephrase it to make it 
clearer.



@Tom: I agree with all you are saying, however, I still think that this KIP 
will improve the overall situation, because everything you pointed out is 
actually true with offset based compaction, too.

The KIP is not a silver bullet that solves all issue for interleaved writes, 
but I personally believe, it's a good improvement.



-Matthias


On 10/30/19 9:45 AM, Senthilnathan Muthusamy wrote:
> Hi,
> 
> Please let me know if anyone has any questions on this updated KIP-280...
> 
> Thanks,
> 
> Senthil
> 
> -Original Message-
> From: Senthilnathan Muthusamy 
> Sent: Monday, October 28, 2019 11:36 PM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> 
> Hi Tom,
> 
> Sorry for the delayed response.
> 
> Regarding the fall back to offset decision for both timestamp & header value 
> is based on the previous author discuss 
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Flists.apache.org%2Fthread.html%2Ff44317eb6cd34f91966654c80509d4a457dbbccdd02b86645782be67%40%253Cdev.kafka.apache.org%253Edata=02%7C01%7Csenthilm%40microsoft.com%7Cb5c596140be1436e9fb708d75df04714%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637081159484181661sdata=%2Fap4F2CdPQe02wNDGkzjzIrxOQRTa2KraQE75dpjTzE%3Dreserved=0
>  and as per the discussion, it is really required to avoid duplicates.
> 
> And the timestamp strategy is from the original KIP author and we are keeping 
> it as is.
> 
> Finally on the sequence order guarantee by the producer, it is not feasible 
> on waiting for ack in async / multi-threads/processes scenarios and hence the 
> header sequence based compact strategy with producer's responsibility to have 
> a unique sequence generation for the topic-partition-key level.
> 
> Hoping this clarifies all your questions. Please let us know if you have any 
> further questions.
> 
> @Guozhang Wang / @Matthias J. Sax, I see you both had a detail discussion on 
> the original KIP with previous author and it 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
3. Right now, we still get the topic partitions assigned as a part of the
top level Assignment object (the one that wraps AssignmentInfo) and use
that to convert taskIds back. This list of only contains assignments for
that particular instance. Attempting to also reverse map for "all" the
tasksIds in the streams cluster i.e all the topic partitions in these
global assignment maps was what was problematic. By explicitly sending the
global assignment maps as actual topic partitions,  group coordinator (i.e
the leader that computes the assignment's ) is able to consistently enforce
its view of the topic metadata. Still don't think doing such a change that
forces you to reconsider semantics, is not needed to save bits on wire. May
be we can discuss this separately from this KIP?

4. There needs to be some caching/interval somewhere though since we don't
want to make 1 kafka read per 1 IQ potentially. But I think its a valid
suggestion, to make this call just synchronous and leave the caching or how
often you want to call to the application. Would it be good to then break
up the APIs for time and offset based lag?  We can obtain offset based lag
for free? Only incur the overhead of reading kafka if we want time
based lags?

On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
wrote:

> Adding on to John's response to 3), can you clarify when and why exactly we
> cannot
> convert between taskIds and partitions? If that's really the case I don't
> feel confident
> that the StreamsPartitionAssignor is not full of bugs...
>
> It seems like it currently just encodes a list of all partitions (the
> assignment) and also
> a list of the corresponding task ids, duplicated to ensure each partition
> has the corresponding
> taskId at the same offset into the list. Why is that problematic?
>
>
> On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:
>
> > Thanks, all, for considering the points!
> >
> > 3. Interesting. I have a vague recollection of that... Still, though,
> > it seems a little fishy. After all, we return the assignments
> > themselves as task ids, and the members have to map these to topic
> > partitions in order to configure themselves properly. If it's too
> > complicated to get this right, then how do we know that Streams is
> > computing the correct partitions at all?
> >
> > 4. How about just checking the log-end timestamp when you call the
> > method? Then, when you get an answer, it's as fresh as it could
> > possibly be. And as a user you have just one, obvious, "knob" to
> > configure how much overhead you want to devote to checking... If you
> > want to call the broker API less frequently, you just call the Streams
> > API less frequently. And you don't have to worry about the
> > relationship between your invocations of that method and the config
> > setting (e.g., you'll never get a negative number, which you could if
> > you check the log-end timestamp less frequently than you check the
> > lag).
> >
> > Thanks,
> > -John
> >
> > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> >  wrote:
> > >
> > > Thanks John for going through this.
> > >
> > >- +1, makes sense
> > >- +1, no issues there
> > >- Yeah the initial patch I had submitted for K-7149(
> > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
> > object had taskIds but the merged PR had similar size according to Vinoth
> > and it was simpler so if the end result is of same size, it would not
> make
> > sense to pivot from dictionary and again move to taskIDs.
> > >- Not sure about what a good default would be if we don't have a
> > configurable setting. This gives the users the flexibility to the users
> to
> > serve their requirements as at the end of the day it would take CPU
> cycles.
> > I am ok with starting it with a default and see how it goes based upon
> > feedback.
> > >
> > > Thanks,
> > > Navinder
> > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  1. Was trying to spell them out separately. but makes sense for
> > > readability. done
> > >
> > > 2. No I immediately agree :) .. makes sense. @navinder?
> > >
> > > 3. I actually attempted only sending taskIds while working on
> KAFKA-7149.
> > > Its non-trivial to handle edges cases resulting from newly added topic
> > > partitions and wildcarded topic entries. I ended up simplifying it to
> > just
> > > dictionary encoding the topic names to reduce size. We can apply the
> same
> > > technique here for this map. Additionally, we could also dictionary
> > encode
> > > HostInfo, given its now repeated twice. I think this would save more
> > space
> > > than having a flag per topic partition entry. Lmk if you are okay with
> > > this.
> > >
> > > 4. This opens up a good discussion. Given we support time lag estimates
> > > also, we need to read the tail record of the changelog periodically
> > (unlike
> > > offset lag, which we can potentially piggyback on metadata in
> > > 

Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-11-04 Thread Guozhang Wang
+1 (binding), thanks Aishwarya!

On Sun, Nov 3, 2019 at 11:46 AM aishwarya kumar  wrote:

> This thread has been open for more than 72 hours. So far there are 2
> binding and 1 non-binding votes, looking to conclude this quickly!!
>
> Best,
> Aishwarya
>
> On Mon, Oct 28, 2019 at 5:00 PM John Roesler  wrote:
>
> > Thanks, Aishwarya!
> >
> > I'm +1 (non-binding)
> >
> > -John
> >
> > On Mon, Oct 28, 2019 at 11:58 AM aishwarya kumar 
> > wrote:
> > >
> > > Thank you,
> > >
> > > Two binding votes so far.
> > >
> > > I'll keep this thread open for a couple of days.
> > >
> > > Best,
> > > Aishwarya
> > >
> > > On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:
> > >
> > > > Thanks for the KIP, this is something that will be appreciated by the
> > > > community.
> > > >
> > > > +1(binding)
> > > >
> > > > -Bill
> > > >
> > > > On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > +1 (binding)
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > > > > Hello All,
> > > > > >
> > > > > > After concluding discussions for this KIP, I would like to go
> > forward
> > > > > with
> > > > > > the voting process.
> > > > > >
> > > > > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > > > > KIP :
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > > > >
> > > > > > Thank you,
> > > > > > Aishwarya
> > > > > >
> > > > >
> > > > >
> > > >
> >
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2019-11-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9137:
---

 Summary: Maintenance of FetchSession cache causing 
FETCH_SESSION_ID_NOT_FOUND in live sessions
 Key: KAFKA-9137
 URL: https://issues.apache.org/jira/browse/KAFKA-9137
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


We have recently seen cases where brokers end up in a bad state where fetch 
session evictions occur at a high rate (> 16 per second) after a roll. This 
increase in eviction rate included the following pattern in our logs:

 
{noformat}
broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
removed ()

broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
removed () broker 6: October 31st 2019, 17:52:45.500 Created a new incremental 
FetchContext for session id 2046264334, epoch 9792: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-67), removed () 

broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
FetchContext for session id 2046264334, epoch 9793: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
removed () 

broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
2046264334. 

broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no such 
session ID found. 

broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
(sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
{noformat}
This pattern appears to be problematic for two reasons. Firstly, the replica 
fetcher for broker 4 was clearly able to send multiple incremental fetch 
requests to broker 6, and receive replies, and did so right up to the point 
where broker 6 evicted its fetch session within milliseconds of multiple fetch 
requests. The second problem is that replica fetchers are considered privileged 
for the fetch session cache, and should not be evicted by consumer fetch 
sessions. This cluster only has 12 brokers and 1000 fetch session cache slots 
(the default for max.incremental.fetch.session.cache.slots), and it thus very 
unlikely that this session should have been evicted by another replica fetcher 
session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Apache Kafka 2.4.0 release

2019-11-04 Thread Manikumar
Hi all,

The underlying issue of KAFKA-8677 can cause data loss in consumers. I have
included KAFKA-8677 as blocker to 2.4 release.
Thanks to Guozhang for identifying the issue.

https://issues.apache.org/jira/browse/KAFKA-8677
PR: https://github.com/apache/kafka/pull/7613

Thanks

On Fri, Nov 1, 2019 at 9:20 PM Manikumar  wrote:

> Hi All,
>
> We still have couple of blockers to close.  PRs available for both the
> blockers.
>
> https://issues.apache.org/jira/browse/KAFKA-8972
> https://issues.apache.org/jira/browse/KAFKA-9080
>
>
> Thanks,
>
>
> On Fri, Oct 25, 2019 at 10:48 PM Manikumar 
> wrote:
>
>> Hi all,
>>
>> Quick update on the 2.4 release. We still have one blocker to close.
>> I will create the first RC after closing the blocker.
>>
>> https://issues.apache.org/jira/browse/KAFKA-8972
>>
>> Thank you!
>>
>> On Fri, Oct 18, 2019 at 12:51 AM Matthias J. Sax 
>> wrote:
>>
>>> Just FYI:
>>>
>>> There was also https://issues.apache.org/jira/browse/KAFKA-9058 that I
>>> just merged.
>>>
>>>
>>> -Matthias
>>>
>>> On 10/17/19 7:59 AM, Manikumar wrote:
>>> > Hi all,
>>> >
>>> > The code freeze deadline has now passed and at this point only blockers
>>> > will be allowed.
>>> > We have three blockers for 2.4.0. I will move out most of the JIRAs
>>> that
>>> > aren't currently
>>> > being worked on. If you think any of the other JIRAs are critical to
>>> > include in 2.4.0,
>>> > please update the fix version, mark as blocker and ensure a PR is
>>> ready to
>>> > merge.
>>> > I will create the first RC as soon as we close the blockers.
>>> > Please help to close out the 2.4.0 JIRAs.
>>> >
>>> > current blockers:
>>> > https://issues.apache.org/jira/browse/KAFKA-8943
>>> > https://issues.apache.org/jira/browse/KAFKA-8992
>>> > https://issues.apache.org/jira/browse/KAFKA-8972
>>> >
>>> > Thank you!
>>> >
>>> > On Tue, Oct 8, 2019 at 8:27 PM Manikumar 
>>> wrote:
>>> >
>>> >> Thanks Bruno. We will mark KIP-471 as complete.
>>> >>
>>> >> On Tue, Oct 8, 2019 at 2:39 PM Bruno Cadonna 
>>> wrote:
>>> >>
>>> >>> Hi Manikumar,
>>> >>>
>>> >>> It is technically true that KIP-471 is not completed, but the only
>>> >>> aspect that is not there are merely two metrics that I could not add
>>> >>> due to the RocksDB version currently used in Streams. Adding those
>>> two
>>> >>> metrics once the RocksDB version will have been increased, will be a
>>> >>> minor effort. So, I would consider KIP-471 as complete with those two
>>> >>> metrics blocked.
>>> >>>
>>> >>> Best,
>>> >>> Bruno
>>> >>>
>>> >>> On Mon, Oct 7, 2019 at 8:44 PM Manikumar 
>>> >>> wrote:
>>> 
>>>  Hi all,
>>> 
>>>  I have moved couple of accepted KIPs without a PR to the next
>>> release.
>>> >>> We
>>>  still have quite a few KIPs
>>>  with PRs that are being reviewed, but haven't yet been merged. I
>>> have
>>> >>> left
>>>  all of these in assuming these
>>>  PRs are ready and not risky to merge.  Please update your assigned
>>>  KIPs/JIRAs, if they are not ready and
>>>   if you know they cannot make it to 2.4.0.
>>> 
>>>  Please ensure that all KIPs for 2.4.0 have been merged by Oct 16th.
>>> Any
>>>  remaining KIPs
>>>  will be moved to the next release.
>>> 
>>>  The KIPs still in progress are:
>>> 
>>>  - KIP-517: Add consumer metrics to observe user poll behavior
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior
>>> >
>>> 
>>>  - KIP-511: Collect and Expose Client's Name and Version in the
>>> Brokers
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
>>> >
>>> 
>>>  - KIP-474: To deprecate WindowStore#put(key, value)
>>>   
>>> 
>>>  - KIP-471: Expose RocksDB Metrics in Kafka Streams
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams
>>> >
>>> 
>>>  - KIP-466: Add support for List serialization and deserialization
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List
>>>  +serialization+and+deserialization>
>>> 
>>>  - KIP-455: Create an Administrative API for Replica Reassignment
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
>>> >
>>> 
>>>  - KIP-446: Add changelog topic configuration to KTable suppress
>>>   <
>>> 
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress
>>> >
>>> 
>>>  - KIP-444: Augment metrics for Kafka Streams
>>>   <
>>> 
>>> >>>
>>> 

Re: [VOTE] KIP-543: Expand ConfigCommand's non-ZK functionality

2019-11-04 Thread Viktor Somogyi-Vass
No problem :)
I also agree that it should be out of scope right now.

On Thu, Oct 31, 2019 at 10:10 PM Colin McCabe  wrote:

> Sorry, that should read "Hi Viktor",-C.
>
> On Thu, Oct 31, 2019, at 14:08, Colin McCabe wrote:
> > Hi Vikto,
> >
> > That's an interesting idea.  However, I think it's better to consider
> > it outside the context of this KIP.  I think one-letter abbreviations
> > would be controversial, and aren't really related to fixing the ZK
> > dependency here.
> >
> > +1 (binding).  Thanks, Brian.
> >
> > best,
> > Colin
> >
> >
> > On Thu, Oct 31, 2019, at 07:20, Viktor Somogyi-Vass wrote:
> > > Hi Brian,
> > >
> > > I was just asking it because of curiosity although I do think it would
> be
> > > useful but it should be done on all tools to keep them consistent.
> > > Perhaps it could be a future consideration.
> > >
> > > I'm +1 on the KIP (non-binding).
> > >
> > > Viktor
> > >
> > > On Wed, Oct 30, 2019 at 5:50 PM Brian Byrne 
> wrote:
> > >
> > > > Hi Viktor,
> > > >
> > > > That's a possibility, however there doesn't appear to be precedent
> for
> > > > one-letter abbreviations with the Kafka commands, so going with the
> current
> > > > flags seemed less controversial. I'm not against it if there's a
> consensus
> > > > that adding {-b, -bl, -u, -c, -t} is better, but as of now, you're
> the
> > > > first to voice it.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > On Wed, Oct 30, 2019 at 5:01 AM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Brian,
> > > > >
> > > > > Have you thought about having one letter abbreviations like '-b'
> for
> > > > > --brokers etc.?
> > > > >
> > > > > Thanks,
> > > > > Viktor
> > > > >
> > > > > On Tue, Oct 29, 2019 at 5:39 PM Brian Byrne 
> wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > I'd like to call a vote on KIP-543: Expand ConfigCommand's non-ZK
> > > > > > functionality, linked here:
> > > > https://cwiki.apache.org/confluence/x/ww-3Bw
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > >
> > > >
> > >
>


Re: [DISCUSS] KIP-542: Partition Reassignment Throttling

2019-11-04 Thread Viktor Somogyi-Vass
Exactly. I also can't envision scenarios where we would like to throttle
the reassignment traffic to only a subset of the reassigning replicas.

The other day I was wondering about that with specialized quotas we can
solve the incremental partition reassignment too. Basically the controller
would throttle most of the partitions to 0 and let only some of them to
reassign but I discarded the idea because it is more intuitive to actually
break up a big reassignment into smaller steps (and more traceable too).
But perhaps there is a need for throttling the reassigning replicas
differently depending on the produce rate on those partitions, however in
my mind I was planning with the incremental partition reassignment so
perhaps it'd be the best if the controller would be able to decide how many
partition can be fitted into the given bandwidth and we'd just expose
simple configs.

If we always take the lowest value, this means that the reassignment
throttle must always be equal to or lower than the replication throttle.
Doesn't that mean that the reassigning partitions may never catch up? I
guess not, since we expect to always be moving less than the total number
of partitions at one time.
I have mixed feelings about this - I like the flexibility of being able to
configure whatever value we please, yet I struggle to come up with a
scenario where we would want a higher reassignment throttle than
replication. Perhaps your suggestion is better.

Yes it could mean that, however concern with preferring reassignment quotas
is that it could cause the "bootstrapping broker problem", so the sum of
follower reassignment + replication quotas would eat away the bandwidth
from the leaders. In this case I think it's a better problem to have a
reassignment that you can't finish than leaders unable to answer fetch
requests fast enough. The reassignment problem can be mitigated in this
case by carefully increasing the replication & reassignment quotas in this
case for the given partition. I'll set up a test environment for this
though and get back if something doesn't add up.

This begs another question - since we're separating the replication
throttle from the reassignment throttle, the maximum traffic a broker may
replicate now becomes `replication.throttled.rate` + `
reassignment.throttled.rate`
Seems like we would benefit from having a total cap to ensure users don't
shoot themselves in the foot.

We could have a new config that denotes the total possible throttle rate
and we then divide that by reassignment and replication. But that assumes
that we would set the replication.throttled.rate much lower than what the
broker could handle.

Perhaps the best approach would be to denote how much the broker can handle
(total.replication.throttle.rate) and then allow only up to N% of that go
towards reassignments (reassignment.throttled.rate) in a best-effort way
(preferring replication traffic). That sounds tricky to implement though
Interested to hear what others think

Good catch. I'm also leaning towards to having simpler configs and
improving the broker/controller code to make more intelligent decisions. I
also agree with having a total.replication.throttle.rate but I think we
should stay with the byte based notation as that is more conventional in
the quota world and easier to handle. That way you can say that your total
replication quota is 10, your leader and follower replication quota is 3
each, the reassignment ones are 2 each and then you maxed out your limit.
We can print warnings/errors if the overall value doesn't match up to the
max.

Viktor

On Mon, Nov 4, 2019 at 12:27 PM Stanislav Kozlovski 
wrote:

> Hi Viktor,
>
> > As for the first question I think is no need for *.throttled.replicas in
> case of reassignment because the LeaderAndIsrRequest exactly specifies the
> replicas needed to be throttled.
>
> Exactly. I also can't envision scenarios where we would like to throttle
> the reassignment traffic to only a subset of the reassigning replicas.
>
> > For instance a bootstrapping server where all replicas are throttled and
> there are reassigning replicas and the reassignment throttle set higher I
> think we should still apply the replication throttle to ensure the broker
> won't have problems. What do you think?
>
> If we always take the lowest value, this means that the reassignment
> throttle must always be equal to or lower than the replication throttle.
> Doesn't that mean that the reassigning partitions may never catch up? I
> guess not, since we expect to always be moving less than the total number
> of partitions at one time.
> I have mixed feelings about this - I like the flexibility of being able to
> configure whatever value we please, yet I struggle to come up with a
> scenario where we would want a higher reassignment throttle than
> replication. Perhaps your suggestion is better.
>
> This begs another question - since we're separating the replication
> throttle from the reassignment throttle, the 

Re: [DISCUSS] KIP-536: Propagate broker timestamp to Admin API

2019-11-04 Thread Gwen Shapira
Cluster metadata is sent to clients on a very regular basis. Adding
start-time there seems quite repetitive. Especially considering that
this information is only useful in very specific cases.

Can we add this capability to the Admin API in a way that won't impact
normal client workflow?

On Mon, Nov 4, 2019 at 4:05 AM Noa Resare  wrote:
>
> Thank you for the feedback, Stanislav!
>
> I agree that it would be good to harmonise the naming, and start-time-ms 
> definitely more descriptive.
>
> I have updated the proposal to reflect this, and also added the updated json 
> RPC changes. Please have a look.
>
> /noa
>
> > On 1 Nov 2019, at 09:13, Stanislav Kozlovski  wrote:
> >
> > Hey Noa,
> >
> > KIP-436 added a JMX metric in Kafka for this exact use-case, called
> > `start-time-ms`. Perhaps it would be useful to name this public interface
> > in the same way for consistency.
> >
> > Could you update the KIP to include the specific RPC changes regarding the
> > metadata request/responses? Here is a recent example of how to portray the
> > changes -
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response
> >
> > Thanks,
> > Stanislav!
> >
> > On Mon, Oct 14, 2019 at 2:46 PM Noa Resare  wrote:
> >
> >> We are in the process of migrating the pieces of automation that currently
> >> reads and modifies zookeeper state to use the Admin API.
> >>
> >> One of the things that we miss doing this is access to the start time of
> >> brokers in a cluster which is used by our automation doing rolling
> >> restarts. We currently read this from the timestamp field from the
> >> epehmeral broker znodes in zookeeper. To address this limitation, I have
> >> authored KIP-536, that proposes adding a timestamp field to the Node class
> >> that the AdminClient.describeCluster() method returns.
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536%3A+Propagate+broker+timestamp+to+Admin+API
> >> <
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536:+Propagate+broker+timestamp+to+Admin+API
> >>>
> >>
> >> Any and all feedback is most welcome
> >>
> >> cheers
> >> noa
> >
> >
> >
> > --
> > Best,
> > Stanislav
>


Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()

2019-11-04 Thread Navinder Brar
Hi,
Please let me know if this is not the correct forum to ask this. But I have a 
doubt, I was hoping someone can clear it for me.
In TaskManager:: updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). I have checked and 
this goes way to 1.1 version when the rebalance protocol was old and standby 
tasks were suspended during rebalance and then resumed on assignment.
I want to know, why post resumption we were/are reading 
standbyTask.checkpointedOffsets() to know the offset from where the standby 
task should start running and not from stateMgr.checkpointed() which gets 
updated on every commit to the checkpoint file. In the former case it's always 
reading from the same offset, even those which it had already read earlier and 
in cases where changelog topic has a retention time, it gives offsetOutOfRange 
exception.
Regards,
Navinder

[jira] [Created] (KAFKA-9136) get consumer latest commited timestamp

2019-11-04 Thread zhangzhisheng (Jira)
zhangzhisheng created KAFKA-9136:


 Summary:  get consumer latest commited timestamp
 Key: KAFKA-9136
 URL: https://issues.apache.org/jira/browse/KAFKA-9136
 Project: Kafka
  Issue Type: Wish
  Components: consumer
Affects Versions: 2.3.0
Reporter: zhangzhisheng






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-536: Propagate broker timestamp to Admin API

2019-11-04 Thread Noa Resare
Thank you for the feedback, Stanislav!

I agree that it would be good to harmonise the naming, and start-time-ms 
definitely more descriptive.

I have updated the proposal to reflect this, and also added the updated json 
RPC changes. Please have a look.

/noa

> On 1 Nov 2019, at 09:13, Stanislav Kozlovski  wrote:
> 
> Hey Noa,
> 
> KIP-436 added a JMX metric in Kafka for this exact use-case, called
> `start-time-ms`. Perhaps it would be useful to name this public interface
> in the same way for consistency.
> 
> Could you update the KIP to include the specific RPC changes regarding the
> metadata request/responses? Here is a recent example of how to portray the
> changes -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response
> 
> Thanks,
> Stanislav!
> 
> On Mon, Oct 14, 2019 at 2:46 PM Noa Resare  wrote:
> 
>> We are in the process of migrating the pieces of automation that currently
>> reads and modifies zookeeper state to use the Admin API.
>> 
>> One of the things that we miss doing this is access to the start time of
>> brokers in a cluster which is used by our automation doing rolling
>> restarts. We currently read this from the timestamp field from the
>> epehmeral broker znodes in zookeeper. To address this limitation, I have
>> authored KIP-536, that proposes adding a timestamp field to the Node class
>> that the AdminClient.describeCluster() method returns.
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536%3A+Propagate+broker+timestamp+to+Admin+API
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536:+Propagate+broker+timestamp+to+Admin+API
>>> 
>> 
>> Any and all feedback is most welcome
>> 
>> cheers
>> noa
> 
> 
> 
> -- 
> Best,
> Stanislav



Re: [DISCUSS] KIP-542: Partition Reassignment Throttling

2019-11-04 Thread Stanislav Kozlovski
Hi Viktor,

> As for the first question I think is no need for *.throttled.replicas in
case of reassignment because the LeaderAndIsrRequest exactly specifies the
replicas needed to be throttled.

Exactly. I also can't envision scenarios where we would like to throttle
the reassignment traffic to only a subset of the reassigning replicas.

> For instance a bootstrapping server where all replicas are throttled and
there are reassigning replicas and the reassignment throttle set higher I
think we should still apply the replication throttle to ensure the broker
won't have problems. What do you think?

If we always take the lowest value, this means that the reassignment
throttle must always be equal to or lower than the replication throttle.
Doesn't that mean that the reassigning partitions may never catch up? I
guess not, since we expect to always be moving less than the total number
of partitions at one time.
I have mixed feelings about this - I like the flexibility of being able to
configure whatever value we please, yet I struggle to come up with a
scenario where we would want a higher reassignment throttle than
replication. Perhaps your suggestion is better.

This begs another question - since we're separating the replication
throttle from the reassignment throttle, the maximum traffic a broker may
replicate now becomes `replication.throttled.rate` + `
reassignment.throttled.rate`
Seems like we would benefit from having a total cap to ensure users don't
shoot themselves in the foot.

We could have a new config that denotes the total possible throttle rate
and we then divide that by reassignment and replication. But that assumes
that we would set the replication.throttled.rate much lower than what the
broker could handle.

Perhaps the best approach would be to denote how much the broker can handle
(total.replication.throttle.rate) and then allow only up to N% of that go
towards reassignments (reassignment.throttled.rate) in a best-effort way
(preferring replication traffic). That sounds tricky to implement though
Interested to hear what others think

Best,
Stanislav


On Mon, Nov 4, 2019 at 11:08 AM Viktor Somogyi-Vass 
wrote:

> Hey Stan,
>
> > We will introduce two new configs in order to eventually replace
> *.replication.throttled.rate.
> Just to clarify, you mean to replace said config in the context of
> reassignment throttling, right? We are not planning to remove that config
>
> Yes, I don't want to remove that config either. Removed that sentence.
>
> And also to clarify, *.throttled.replicas will not apply to the new
> *reassignment* configs, correct? We will throttle all reassigning replicas.
> (I am +1 on this, I believe it is easier to reason about. We could always
> add a new config later)
>
> Are you asking whether there is a need for a
> leader.reassignment.throttled.replicas and
> follower.reassignment.throttled.replicas config or are you interested in
> the behavior between the old and the new configs?
> As for the first question I think is no need for *.throttled.replicas in
> case of reassignment because the LeaderAndIsrRequest exactly specifies the
> replicas needed to be throttled.
> As for the second, see below.
>
> I have one comment about backwards-compatibility - should we ensure that
> the old `*.replication.throttled.rate` and `*.throttled.replicas` still
> apply to reassigning traffic if set? We could have the new config take
> precedence, but still preserve backwards compatibility.
>
> Sure, we should apply replication throttling to reassignment too if set.
> But instead of the new taking precedence I'd apply whichever has lower
> value.
> For instance a bootstrapping server where all replicas are throttled and
> there are reassigning replicas and the reassignment throttle set higher I
> think we should still apply the replication throttle to ensure the broker
> won't have problems. What do you think?
>
> Thanks,
> Viktor
>
>
> On Fri, Nov 1, 2019 at 9:57 AM Stanislav Kozlovski  >
> wrote:
>
> > Hey Viktor. Thanks for the KIP!
> >
> > > We will introduce two new configs in order to eventually replace
> > *.replication.throttled.rate.
> > Just to clarify, you mean to replace said config in the context of
> > reassignment throttling, right? We are not planning to remove that config
> >
> > And also to clarify, *.throttled.replicas will not apply to the new
> > *reassignment* configs, correct? We will throttle all reassigning
> replicas.
> > (I am +1 on this, I believe it is easier to reason about. We could always
> > add a new config later)
> >
> > I have one comment about backwards-compatibility - should we ensure that
> > the old `*.replication.throttled.rate` and `*.throttled.replicas` still
> > apply to reassigning traffic if set? We could have the new config take
> > precedence, but still preserve backwards compatibility.
> >
> > Thanks,
> > Stanislav
> >
> > On Thu, Oct 24, 2019 at 1:38 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com>
> > wrote:
> >
> > > Hi People,
> 

Re: [DISCUSS] KIP-542: Partition Reassignment Throttling

2019-11-04 Thread Viktor Somogyi-Vass
Hey Stan,

> We will introduce two new configs in order to eventually replace
*.replication.throttled.rate.
Just to clarify, you mean to replace said config in the context of
reassignment throttling, right? We are not planning to remove that config

Yes, I don't want to remove that config either. Removed that sentence.

And also to clarify, *.throttled.replicas will not apply to the new
*reassignment* configs, correct? We will throttle all reassigning replicas.
(I am +1 on this, I believe it is easier to reason about. We could always
add a new config later)

Are you asking whether there is a need for a
leader.reassignment.throttled.replicas and
follower.reassignment.throttled.replicas config or are you interested in
the behavior between the old and the new configs?
As for the first question I think is no need for *.throttled.replicas in
case of reassignment because the LeaderAndIsrRequest exactly specifies the
replicas needed to be throttled.
As for the second, see below.

I have one comment about backwards-compatibility - should we ensure that
the old `*.replication.throttled.rate` and `*.throttled.replicas` still
apply to reassigning traffic if set? We could have the new config take
precedence, but still preserve backwards compatibility.

Sure, we should apply replication throttling to reassignment too if set.
But instead of the new taking precedence I'd apply whichever has lower
value.
For instance a bootstrapping server where all replicas are throttled and
there are reassigning replicas and the reassignment throttle set higher I
think we should still apply the replication throttle to ensure the broker
won't have problems. What do you think?

Thanks,
Viktor


On Fri, Nov 1, 2019 at 9:57 AM Stanislav Kozlovski 
wrote:

> Hey Viktor. Thanks for the KIP!
>
> > We will introduce two new configs in order to eventually replace
> *.replication.throttled.rate.
> Just to clarify, you mean to replace said config in the context of
> reassignment throttling, right? We are not planning to remove that config
>
> And also to clarify, *.throttled.replicas will not apply to the new
> *reassignment* configs, correct? We will throttle all reassigning replicas.
> (I am +1 on this, I believe it is easier to reason about. We could always
> add a new config later)
>
> I have one comment about backwards-compatibility - should we ensure that
> the old `*.replication.throttled.rate` and `*.throttled.replicas` still
> apply to reassigning traffic if set? We could have the new config take
> precedence, but still preserve backwards compatibility.
>
> Thanks,
> Stanislav
>
> On Thu, Oct 24, 2019 at 1:38 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Hi People,
> >
> > I've created a KIP to improve replication quotas by handling reassignment
> > related throttling as a separate case with its own configurable limits
> and
> > change the kafka-reassign-partitions tool to use these new configs going
> > forward.
> > Please have a look, I'd be happy to receive any feedback and answer
> > all your questions.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-542%3A+Partition+Reassignment+Throttling
> >
> > Thanks,
> > Viktor
> >
>
>
> --
> Best,
> Stanislav
>


[jira] [Created] (KAFKA-9135) Kafka producer/consumer are creating too many open file

2019-11-04 Thread Dhirendra Singh (Jira)
Dhirendra Singh created KAFKA-9135:
--

 Summary: Kafka producer/consumer are creating too many open file
 Key: KAFKA-9135
 URL: https://issues.apache.org/jira/browse/KAFKA-9135
 Project: Kafka
  Issue Type: Bug
  Components: admin, consumer, producer 
Affects Versions: 1.0.1
 Environment: apache kafka client :- 1.0.1
Kafka version :- 1.0.1
Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1
CentOS version :- CentOS Linux release 7.6.1810
Reporter: Dhirendra Singh


We have a 3 node Kafka cluster deployment with 5 topic and 6 partition per 
topic . we have configured the replication factor =3 , we are seeing very 
strange problem that number of file descriptor have been crossed the ulimit ( 
what is 50K for our application)

As per the lsof command and our ananlsys

1. there are 15K established connection from kafka producer/consumer towards 
broker and at the same time in thread dump we have observed thousands of entry 
for kafka 'admin-client-network-thread'

admin-client-network-thread" #224398 daemon prio=5 os_prio=0 
tid=0x7f12ca119800 nid=0x5363 runnable [0x7f12c4db8000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005e0603238> (a sun.nio.ch.Util$3)
- locked <0x0005e0603228> (a java.util.Collections$UnmodifiableSet)
- locked <0x0005e0602f08> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:672)
at org.apache.kafka.common.network.Selector.poll(Selector.java:396)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
- locked <0x0005e0602dc0> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
at kafka.admin.AdminClient$$anon$1.run(AdminClient.scala:61)
at java.lang.Thread.run(Thread.java:748)


2. As per the lsof output , We have observed 35K entry for pipe and event poll

java 5441 notifs 374r FIFO 0,9 0t0 22415240 pipe
java 5441 notifs 375w FIFO 0,9 0t0 22415240 pipe
java 5441 notifs 376u a_inode 0,10 0 6379 [eventpoll]
java 5441 notifs 377r FIFO 0,9 0t0 2247 pipe
java 5441 notifs 378r FIFO 0,9 0t0 28054726 pipe
java 5441 notifs 379r FIFO 0,9 0t0 22415241 pipe
java 5441 notifs 380w FIFO 0,9 0t0 22415241 pipe
java 5441 notifs 381u a_inode 0,10 0 6379 [eventpoll]
java 5441 notifs 382w FIFO 0,9 0t0 2247 pipe
java 5441 notifs 383u a_inode 0,10 0 6379 [eventpoll]
java 5441 notifs 384u a_inode 0,10 0 6379 [eventpoll]
java 5441 notifs 385r FIFO 0,9 0t0 40216087 pipe
java 5441 notifs 386r FIFO 0,9 0t0 22483470 pipe


Setup details :- 
apache kafka client :- 1.0.1
Kafka version :- 1.0.1
Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1
CentOS version :- CentOS Linux release 7.6.1810

Note :- After restarted VM file descriptor count was able to clear and come to 
normal count as 1000 then after a few second file descriptor count started to 
increase and it will reach to 50K (ulimit) after 1 week inIdle scenarion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)