[jira] [Commented] (KAFKA-3160) Kafka LZ4 framing code miscalculates header checksum

2016-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234514#comment-15234514
 ] 

ASF GitHub Bot commented on KAFKA-3160:
---

GitHub user dpkp opened a pull request:

https://github.com/apache/kafka/pull/1212

KAFKA-3160: Fix LZ4 Framing

This contribution is my original work and I license the work under Apache 
2.0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dpkp/kafka KAFKA-3160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1212


commit b64e5f9f054131ae7bf6b9a10be861f5fb0caeab
Author: Dana Powers 
Date:   2016-04-11T04:35:43Z

Update KafkaLZ4Block* implementation to 1.5.1 framing spec

 - update spec to 1.5.1; remove dictID
 - fix frame descriptor HC check (dont include magic bytes)
 - dont require HC validation on input by default
 - add useBrokenHC boolean for output compatibility
 - nominal support for contentChecksum / contentSize flags

commit f1380d0e5f6e1e9d7b48a9cff3fbcd13b7a5fe3f
Author: Dana Powers 
Date:   2016-04-11T05:35:31Z

KAFKA-3160: use LZ4 v1.5.1 framing for all v1 messages; keep old framing 
for v0 messages




> Kafka LZ4 framing code miscalculates header checksum
> 
>
> Key: KAFKA-3160
> URL: https://issues.apache.org/jira/browse/KAFKA-3160
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.8.2.2, 0.9.0.1
>Reporter: Dana Powers
>Assignee: Magnus Edenhill
>  Labels: compatibility, compression, lz4
>
> KAFKA-1493 partially implements the LZ4 framing specification, but it 
> incorrectly calculates the header checksum. This causes 
> KafkaLZ4BlockInputStream to raise an error 
> [IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
> LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
> framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
> always receive incorrectly framed data.
> Specifically, the current implementation includes the 4-byte MagicNumber in 
> the checksum, which is incorrect.
> http://cyan4973.github.io/lz4/lz4_Frame_format.html
> Third-party clients that attempt to use off-the-shelf lz4 framing find that 
> brokers reject messages as having a corrupt checksum. So currently non-java 
> clients must 'fixup' lz4 packets to deal with the broken checksum.
> Magnus first identified this issue in librdkafka; kafka-python has the same 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3160: Fix LZ4 Framing

2016-04-10 Thread dpkp
GitHub user dpkp opened a pull request:

https://github.com/apache/kafka/pull/1212

KAFKA-3160: Fix LZ4 Framing

This contribution is my original work and I license the work under Apache 
2.0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dpkp/kafka KAFKA-3160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1212


commit b64e5f9f054131ae7bf6b9a10be861f5fb0caeab
Author: Dana Powers 
Date:   2016-04-11T04:35:43Z

Update KafkaLZ4Block* implementation to 1.5.1 framing spec

 - update spec to 1.5.1; remove dictID
 - fix frame descriptor HC check (dont include magic bytes)
 - dont require HC validation on input by default
 - add useBrokenHC boolean for output compatibility
 - nominal support for contentChecksum / contentSize flags

commit f1380d0e5f6e1e9d7b48a9cff3fbcd13b7a5fe3f
Author: Dana Powers 
Date:   2016-04-11T05:35:31Z

KAFKA-3160: use LZ4 v1.5.1 framing for all v1 messages; keep old framing 
for v0 messages




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-33 - Add a time based log index

2016-04-10 Thread Jun Rao
Hi, Jiangjie,

Thanks for the update. Looks good to me overall. Just a few minor comments
below.

10. On broker startup, it's not clear to me why we need to scan the log
segment to retrieve the largest timestamp since the time index always has
an entry for the largest timestamp. Is that only for restarting after a
hard failure?

11. On broker startup, if a log segment misses the time index, do we always
rebuild it? This can happen when the broker is upgraded.

12. Related to Guozhang's question #1. It seems it's simpler to add time
index entries independent of the offset index since at index entry may not
be added to the offset and the time index at the same time. Also, this
allows time index to be rebuilt independently if needed.

Thanks,

Jun


On Wed, Apr 6, 2016 at 5:44 PM, Becket Qin  wrote:

> Hi all,
>
> I updated KIP-33 based on the initial implementation. Per discussion on
> yesterday's KIP hangout, I would like to initiate the new vote thread for
> KIP-33.
>
> The KIP wiki:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>
> Here is a brief summary of the KIP:
> 1. We propose to add a time index for each log segment.
> 2. The time indices are going to be used of log retention, log rolling and
> message search by timestamp.
>
> There was an old voting thread which has some discussions on this KIP. The
> mail thread link is following:
>
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201602.mbox/%3ccabtagwgoebukyapfpchmycjk2tepq3ngtuwnhtr2tjvsnc8...@mail.gmail.com%3E
>
> I have the following WIP patch for reference. It needs a few more unit
> tests and documentation. Other than that it should run fine.
>
> https://github.com/becketqin/kafka/commit/712357a3fbf1423e05f9eed7d2fed5b6fe6c37b7
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


"nag" for Pull Request

2016-04-10 Thread Eric Wasserman
Following the advice on http://kafka.apache.org/contributing.html 
 
> this email is a "nag" for the PR

https://github.com/apache/kafka/pull/1168 
 
>, "KAFKA-1981 Make log compaction 
point configurable"

I would love to get feedback/acceptance.

Thanks,

- Eric

Build failed in Jenkins: kafka-trunk-jdk7 #1187

2016-04-10 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3338: Add print and writeAsText to KStream/KTable in Kafka 
Streams

--
[...truncated 1590 lines...]

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeAppendMessage PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeClearShutdown PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.CleanerTest > testLogToClean PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupStable PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatIllegalGeneration 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testDescribeGroupWrongCoordinator PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupRebalancing 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaderFailureInSyncGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testGenerationIdIncrementsOnRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFromIllegalGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testInvalidGroupId PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesStableGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatDuringRebalanceCausesRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentGroupProtocol PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooLarge PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooSmall PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupEmptyAssignment 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetWithDefaultGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedLeaderShouldRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesRebalancingGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFollowerAfterLeader PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testCommitOffsetInAwaitingSync 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testJoinGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupFromUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentProtocolType PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetFromUnknownGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testLeaveGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerNewGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedFollowerDoesNotRebalance PASSED


Build failed in Jenkins: kafka-trunk-jdk8 #516

2016-04-10 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3338: Add print and writeAsText to KStream/KTable in Kafka 
Streams

--
[...truncated 1628 lines...]

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeAppendMessage PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeClearShutdown PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.CleanerTest > testLogToClean PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupStable PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatIllegalGeneration 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testDescribeGroupWrongCoordinator PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupRebalancing 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaderFailureInSyncGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testGenerationIdIncrementsOnRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFromIllegalGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testInvalidGroupId PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesStableGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatDuringRebalanceCausesRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentGroupProtocol PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooLarge PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooSmall PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupEmptyAssignment 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetWithDefaultGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedLeaderShouldRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesRebalancingGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFollowerAfterLeader PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testCommitOffsetInAwaitingSync 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testJoinGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupFromUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentProtocolType PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetFromUnknownGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testLeaveGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerNewGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedFollowerDoesNotRebalance PASSED


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-04-10 Thread Jun Rao
Thinking about ApiVersionRequest a bit more. There are quite a few things
special about it. In the ideal case, (1) its version should never change;
(2) it needs to be done before authentication (either SSL/SASL); (3) it is
required to be issued at the beginning of each connection but never needs
to be issued again on the same connection. So, instead of modeling it as a
regular request, it seems a cleaner approach is to just bake that into the
initial connection handshake even before the authentication layer. So the
sequencing in a connection will be api discovery, authentication, followed
by regular requests. I am not sure we can still add this in a backward
compatible way now (e.g., not sure what the initial bytes from an SSL
connection will look like). Even if we can do this in a backward compatible
way, it's probably non-trivial amount of work though.

We started KIP-35 with supporting a client to know if a version is
supported by the broker. It's now evolved into supporting a client to
implement multiple versions of the protocol and dynamically pick a version
supported by the broker. The former is likely solvable without
ApiVersionRequest. How important is the latter? What if the C client just
follows the java client model by implementing one version of protocol per C
client release (which seems easier to implement)?

Thanks,

Jun


On Fri, Apr 8, 2016 at 4:20 PM, Jun Rao  wrote:

> Magnus,
>
> A while back, we had another proposal for the broker to just send the
> correlation id and an empty payload if it receives an unsupported version
> of the request. I didn't see that in the rejected section. It seems simpler
> than the current proposal where the client has to issue an
> ApiVersionRequest on every connection. Could you document the reason why we
> rejected it?
>
> Thanks,
>
> Jun
>
> On Tue, Apr 5, 2016 at 1:47 PM, Ashish Singh  wrote:
>
>> On Fri, Apr 1, 2016 at 1:32 AM, Ismael Juma  wrote:
>>
>> > Two more things:
>> >
>> > 3. We talk about backporting of new request versions to stable branches
>> in
>> > the KIP. In practice, we can't do that until the Java client is changed
>> so
>> > that it doesn't blindly use the latest protocol version. Otherwise, if
>> new
>> > request versions were added to 0.9.0.2, the client would break when
>> talking
>> > to a 0.9.0.1 broker (given Jason's proposal, it would fail a bit more
>> > gracefully, but that's still not good enough for a stable branch). It
>> may
>> > be worth making this clear in the KIP (yes, it is a bit orthogonal and
>> > doesn't prevent the KIP from being adopted, but good to avoid
>> confusion).
>> >
>> Good point. Adding this note and also adding a note that Kafka has not
>> backported an api version so far.
>>
>> >
>> > 4. The paragraph below is a bit confusing. It starts talking about 0.9.0
>> > and trunk and then switches to 0.9.1. Is that intentional?
>> >
>> Yes.
>>
>> >
>> > "Deprecation of a protocol version will be done by marking a protocol
>> > version as deprecated in protocol documentation. Documentation shall
>> also
>> > be used to indicate a protocol version that must not be used, or for any
>> > such information.For instance, say 0.9.0 had protocol versions [0] for
>> api
>> > key 1. On trunk, version 1 of the api key was added. Users running off
>> > trunk started using version 1 of the api and found out a major bug. To
>> > rectify that version 2 of the api is added to trunk. For some reason,
>> it is
>> > now deemed important to have version 2 of the api in 0.9.1 as well. To
>> do
>> > so, version 1 and version 2 both of the api will be backported to the
>> 0.9.1
>> > branch. 0.9.1 broker will return 0 as min supported version for the api
>> and
>> > 2 for the max supported version for the api. However, the version 1
>> should
>> > be clearly marked as deprecated on its documentation. It will be
>> client's
>> > responsibility to make sure they are not using any such deprecated
>> version
>> > to the best knowledge of the client at the time of development (or
>> > alternatively by configuration)."
>> >
>> > Ismael
>> >
>> >
>> >
>> > On Fri, Apr 1, 2016 at 9:22 AM, Ismael Juma  wrote:
>> >
>> > > A couple of questions:
>> > >
>> > > 1. The KIP says "Specific version may be deprecated through protocol
>> > > documentation but must still be supported (although it is fair to
>> return
>> > an
>> > > error code if the specific API supports it).". It may be worth
>> expanding
>> > > this a little more. For example, what does it mean to support the
>> API? I
>> > > guess this means that the broker must not disconnect the client and
>> the
>> > > broker must return a valid protocol response. Given that it says that
>> it
>> > is
>> > > "fair" (I would probably replace "fair" with "valid") to return an
>> error
>> > > code if the specific API supports it, it sounds like we are saying
>> that
>> > we
>> > > don't have to maintain the semantic 

Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-04-10 Thread Jun Rao
Ismael,

My responses are inlined below.

On Sun, Apr 10, 2016 at 12:25 PM, Ismael Juma  wrote:

> Hi Jun,
>
> A couple of points below.
>
> On Sat, Apr 9, 2016 at 12:19 AM, Jun Rao  wrote:
>
> > 5. Your main request is how can a client know that the broker is now
> > supporting new SASL mechanisms. One way to support that is to adjust
> KIP-43
> > slightly. We can model the SaslMechanismRequest as a regular request
> (with
> > standard request header) and add that to our protocol definition.
> Version 0
> >
>
> The current compatibility story for older clients in KIP-43 is that we send
> the mechanism first as that can be distinguished from the bytes sent by the
> GSSAPI in 0.9.0.0. If we use the standard request header for
> SaslMechanismRequest (which I agree would be a nice thing to do) then we
> would be sending the api key (INT16) first.
>

Yes, that should be fine right? Since the new api key will start with a 0
byte, it actually guarantees that it's different from 0x60 (1st byte in the
old protocol) even if we change the request version id in the future.


>
> of this request indicates that it supports GSSAPI and SASL Plain. If we
> > support any additional mechanism in the future, we will bump up the
> version
> > of SaslMechanismRequest. We also add in the protocol documentation that
> the
> > SASL authentication protocol is SaslMechanismRequest followed by token
> > exchange from SASL library. If we pick the current proposal in KIP-35,
> when
> > the client issues ApiRequest, we will return the supported versions
> > for SaslMechanismRequest as well. Does this work for you?
> >
>
> Currently, authentication would have to succeed before any application
> layer request can be sent. To make sure I understand correctly, are you
> suggesting that we would change it so that an ApiVersionRequest would be
> possible before authentication happens (so that the client would then know
> the supported versions of SaslMechanismRequest)?
>
>
No, I was thinking that you still need to be able to authenticate before
you can issue ApiVersionRequest. But you made me think a bit more on
ApiVersionRequest. Will reply directly to the KIP-35 thread.


> Thanks,
> Ismael
>

Thanks,

Jun


[jira] [Resolved] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL

2016-04-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3338.
--
   Resolution: Fixed
Fix Version/s: (was: 0.10.1.0)
   0.10.0.0

Issue resolved by pull request 1209
[https://github.com/apache/kafka/pull/1209]

> Add print and writeAsText functions to the Streams DSL
> --
>
> Key: KAFKA-3338
> URL: https://issues.apache.org/jira/browse/KAFKA-3338
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>  Labels: api, newbie++
> Fix For: 0.10.0.0
>
>
> We want to provide some REPL-like pattern for users for better debuggability. 
> More concretely, we want to allow users to easily inspect their intermediate 
> data streams in the topology while running the application. Theoretically 
> this can be done by using a break point, or by calling System.out.println() 
> inside the operator, or through a finer grained trace-level logging. But more 
> user-friendly API would be to add a print() function to the KStream / KTable 
> object like:
> {code}
> // Prints the elements in this stream to the stdout, i.e. "System.out" of the 
> JVM
> KStream#print(/* optional serde */);  
> KTable#print(/* optional serde */);  
> // Writes the stream as text file(s) to the specified location.
> KStream#writeAsText(String filePath, /* optional serde */);
> KTable#writeAsText(String filePath, /* optional serde */);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3338 [Kafka Streams] : Add print and wri...

2016-04-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1209


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL

2016-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234375#comment-15234375
 ] 

ASF GitHub Bot commented on KAFKA-3338:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1209


> Add print and writeAsText functions to the Streams DSL
> --
>
> Key: KAFKA-3338
> URL: https://issues.apache.org/jira/browse/KAFKA-3338
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>  Labels: api, newbie++
> Fix For: 0.10.0.0
>
>
> We want to provide some REPL-like pattern for users for better debuggability. 
> More concretely, we want to allow users to easily inspect their intermediate 
> data streams in the topology while running the application. Theoretically 
> this can be done by using a break point, or by calling System.out.println() 
> inside the operator, or through a finer grained trace-level logging. But more 
> user-friendly API would be to add a print() function to the KStream / KTable 
> object like:
> {code}
> // Prints the elements in this stream to the stdout, i.e. "System.out" of the 
> JVM
> KStream#print(/* optional serde */);  
> KTable#print(/* optional serde */);  
> // Writes the stream as text file(s) to the specified location.
> KStream#writeAsText(String filePath, /* optional serde */);
> KTable#writeAsText(String filePath, /* optional serde */);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

2016-04-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234370#comment-15234370
 ] 

Guozhang Wang commented on KAFKA-3511:
--

I like [~jkreps] suggestion about giving the aggregators instead of adding more 
and more built-in operators. The problem we need to solve is that, there is 
more than one functions in aggregate: initializer and adder for KStream, 
initializer, adder and subtractor for KTable. So to achieve this simple 
interface we need to have an overloading

aggregate(aggregator, )... where aggregator will have multiple APIs and 
hence cannot be re-written in lambda expression; but as long as we make it 
clear that it is only for customizing common aggregations that would be fine.

> Provide built-in aggregators sum() and avg() in Kafka Streams DSL
> -
>
> Key: KAFKA-3511
> URL: https://issues.apache.org/jira/browse/KAFKA-3511
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, newbie
> Fix For: 0.10.0.0
>
>
> Currently we only have one built-in aggregate function count() in the Kafka 
> Streams DSL, but we want to add more aggregation functions like sum() and 
> avg().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStream Close Processor

2016-04-10 Thread Guozhang Wang
Re 1), Kafka Streams intentionally close all underlying clients before
closing processors since some of closing the processors require shutting
down its processor state managers, for example we need to make sure
producer's message sends // have all been acked before the state manager
records // changelog sent offsets. To complement it we trigger commitAll()
before closing the clients.


Guozhang

On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps  wrote:

> Also, I wonder if this issue is related:
> https://issues.apache.org/jira/browse/KAFKA-3135
>
> -Jay
>
> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps  wrote:
>
> > Two things:
> > 1. Caching data in the processor is a bit dangerous since it will be lost
> > on failure. Nonetheless, I think you have a point that we should ideally
> > close the processors first, then commit in case they send any messages on
> > close.
> > 2. The issue you describe shouldn't happen for the reason you describe.
> > Both the broker and the consumer handle batches of messages so fetching a
> > single 1 MB message versus 1024 1KB messages should be the same. The
> > proposed max.poll.messages would just effect how many records are handed
> > out they will have been fetched and be in memory in the consumer no
> matter
> > what. I wonder if you could help us trace down what's happening for
> > you--maybe provide a simple test case that reproduces the problem?
> >
> >
> > On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
> > mdco...@yahoo.com.invalid> wrote:
> >
> >> Guozhang,
> >>Yes, I'm merging message contents into larger messages before sending
> >> to the producer. We have demonstrated that many tiny messages of < 1K
> >> causes tremendous slow down on the down stream consumers. Not because of
> >> memory contention but because of the broker filling up the max fetch
> >> request size by adding hundreds of thousands of tiny messages to the
> fetch
> >> response. The consumer then has to deal with those messages and it
> causes
> >> huge latency problems….the broker has to add those hundreds of
> thousands of
> >> messages to the response. It takes > 5 seconds per fetch to return from
> the
> >> broker in most cases. In contrast, when I merge messages into bundled
> >> single-messages with larger payloads, we get excellent throughput
> because
> >> there is less polling and the number of messages is reduced.
> >>I'm locked into a battle between fetch size constraints and max
> >> message size constraints…my max message size can actually spike over 5MB
> >> for a single message (non-merged) but most of the time it's < 1K. That's
> >> just the kind of data set we're dealing with. So I can't set fetch size
> too
> >> low or one of these larger messages will come in and break the consumer
> >> from being able to process anything.
> >>So we either need a way to tell the broker not to fill the max fetch
> >> size before returning (max.poll.messages) or I need a way to flush to
> the
> >> producer when it's about to close my producer. The latter offers the
> >> benefit of flushing data that may be the results of processing input
> data
> >> whose offsets were already committed asynchronously.
> >> Mike
> >>
> >> On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> wrote:
> >>
> >>
> >>  Mike,
> >>
> >> Not clear what do you mean by "buffering up the contents". Producer
> itself
> >> already did some buffering and batching when sending to Kafka. Did you
> >> actually "merge" multiple small messages into one large message before
> >> giving it to the producer in the app code? In either case, I am not sure
> >> how it will help the downstream consumer memory pressure issue?
> >>
> >> About bounding the consumer memory usage, we already have some thoughts
> >> about that issue and plan to add the memory bounding feature like the
> >> producer does in the near future (
> >> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a
> >> problem
> >> for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
> >> are shooting to have it released end of this month.
> >>
> >> Guozhang
> >>
> >>
> >> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon
>  >> >
> >> wrote:
> >>
> >> > Guozhang,
> >> >In my processor, I'm buffering up contents of the final messages in
> >> > order to make them larger. This is to optimize throughput and avoid
> tiny
> >> > messages from being injected downstream. So nothing is being pushed to
> >> the
> >> > producer until my configured thresholds are met in the buffering
> >> mechanism.
> >> > So as it stands, these messages are left dangling after the producer
> >> closes
> >> > and, even worse, if periodic commits are happening behind the scenes,
> >> the
> >> > data is lost on restart.
> >> >What we need is a way to notify the processors that everything is
> >> > "about" to close so that I can properly flush what I have in memory
> 

[jira] [Commented] (KAFKA-3160) Kafka LZ4 framing code miscalculates header checksum

2016-04-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234312#comment-15234312
 ] 

Guozhang Wang commented on KAFKA-3160:
--

One thing with serving old format consume request is that now we need to modify 
the bytes and hence cannot do zero-copy anymore, but since for 0.10.0 upgrade 
there will be a temporary performance degradation anyways this may be just fine.

> Kafka LZ4 framing code miscalculates header checksum
> 
>
> Key: KAFKA-3160
> URL: https://issues.apache.org/jira/browse/KAFKA-3160
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.8.2.2, 0.9.0.1
>Reporter: Dana Powers
>Assignee: Magnus Edenhill
>  Labels: compatibility, compression, lz4
>
> KAFKA-1493 partially implements the LZ4 framing specification, but it 
> incorrectly calculates the header checksum. This causes 
> KafkaLZ4BlockInputStream to raise an error 
> [IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
> LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
> framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
> always receive incorrectly framed data.
> Specifically, the current implementation includes the 4-byte MagicNumber in 
> the checksum, which is incorrect.
> http://cyan4973.github.io/lz4/lz4_Frame_format.html
> Third-party clients that attempt to use off-the-shelf lz4 framing find that 
> brokers reject messages as having a corrupt checksum. So currently non-java 
> clients must 'fixup' lz4 packets to deal with the broken checksum.
> Magnus first identified this issue in librdkafka; kafka-python has the same 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-04-10 Thread Ismael Juma
Hi Jun,

A couple of points below.

On Sat, Apr 9, 2016 at 12:19 AM, Jun Rao  wrote:

> 5. Your main request is how can a client know that the broker is now
> supporting new SASL mechanisms. One way to support that is to adjust KIP-43
> slightly. We can model the SaslMechanismRequest as a regular request (with
> standard request header) and add that to our protocol definition. Version 0
>

The current compatibility story for older clients in KIP-43 is that we send
the mechanism first as that can be distinguished from the bytes sent by the
GSSAPI in 0.9.0.0. If we use the standard request header for
SaslMechanismRequest (which I agree would be a nice thing to do) then we
would be sending the api key (INT16) first.


> of this request indicates that it supports GSSAPI and SASL Plain. If we
> support any additional mechanism in the future, we will bump up the version
> of SaslMechanismRequest. We also add in the protocol documentation that the
> SASL authentication protocol is SaslMechanismRequest followed by token
> exchange from SASL library. If we pick the current proposal in KIP-35, when
> the client issues ApiRequest, we will return the supported versions
> for SaslMechanismRequest as well. Does this work for you?
>

Currently, authentication would have to succeed before any application
layer request can be sent. To make sure I understand correctly, are you
suggesting that we would change it so that an ApiVersionRequest would be
possible before authentication happens (so that the client would then know
the supported versions of SaslMechanismRequest)?

Thanks,
Ismael


Re: [DISCUSS] KIP-50 - Enhance Authorizer interface to be aware of supported Principal Types

2016-04-10 Thread Jun Rao
Ashish,

A 3rd option is to in 0.10.0, just sanity check the principal type in the
implementation of addAcls/removeAcls of Authorizer, but don't change the
Authorizer api to add the getDescription() method. This fixes the immediate
issue that an acl rule with the wrong principal type is silently ignored.
Knowing valid user types is nice, but not critical (we can include the
supported user type in the UnsupportedPrincipalTypeException thrown from
addAcls/removeAcls). This will give us more time to clean up the Authorizer
api post 0.10.0.

Thanks

Jun

On Fri, Apr 8, 2016 at 9:04 AM, Ashish Singh  wrote:

> Thanks for the input Don. One of the possible paths for Option 2 is to
> completely drop Scala interface, would that be Ok with you folks?
>
> On Thursday, April 7, 2016, Don Bosco Durai  wrote:
>
> > Ranger team would prefer option #2. Right now, we have to access some of
> > the nested constants using constructs like Group$.MODULE$, which is not
> > intuitive in Java.
> >
> > Thanks
> >
> > Bosco
> >
> >
> >
> >
> > On 4/7/16, 4:30 PM, "Ashish Singh" >
> > wrote:
> >
> > >Harsha/ Don,
> > >
> > >Are you guys OK with option 2? I am not aware of all the existing
> > >authorizer implementations, however ranger has one for sure. Getting
> > direct
> > >feedback from you guys will be really valuable.
> > >
> > >On Thu, Apr 7, 2016 at 3:52 PM, Ismael Juma  > > wrote:
> > >
> > >> Hi Don,
> > >>
> > >> This is true in Java 7, but Java 8 introduces default methods and this
> > >> workaround is no longer required. During the Interceptor KIP
> > discussion, it
> > >> was decided that it was fine to stick to interfaces given that we are
> > >> likely to move to Java 8 in the nearish future (probably no later than
> > the
> > >> Java 9 release).
> > >>
> > >> Ismael
> > >>
> > >> On Thu, Apr 7, 2016 at 11:36 PM, Don Bosco Durai  > > wrote:
> > >>
> > >> > Hi Ashish
> > >> >
> > >> > If we are going by option #2, then I can suggest we give an abstract
> > >> > implementation of the Interface and recommend anyone implementing
> > their
> > >> own
> > >> > plugin to extend from the abstract class, rather than implement the
> > >> > interface?
> > >> >
> > >> > The advantage is, in the future if we add add any new methods in the
> > >> > Interface (e.g. Similar to getDescription()), then we can give a
> dummy
> > >> > implementation of the new method and this won’t break the
> compilation
> > of
> > >> > any external implementation. Else over the time it will be
> challenging
> > >> for
> > >> > anyone customizing the implementation to keep track of changes to
> the
> > >> > Interface.
> > >> >
> > >> > Thanks
> > >> >
> > >> > Bosco
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On 4/7/16, 11:21 AM, "Ashish Singh"  > > wrote:
> > >> >
> > >> > >Hello Harsha,
> > >> > >
> > >> > >On Thu, Apr 7, 2016 at 11:03 AM, Harsha  > > wrote:
> > >> > >
> > >> > >"My only ask is to have this in 0.10. As Jay pointed out, right now
> > >> > >> there
> > >> > >> are not many implementations out there, we might want to fix it
> > ASAP."
> > >> > >>
> > >> > >> Probably there aren't many implementations but there are lot of
> > users
> > >> > >> using these implementations in production clusters.
> > >> > >> Isn't this going to break the rolling upgrade?
> > >> > >
> > >> > >It will and it is a concern, in my previous mail I have mentioned
> > this
> > >> as
> > >> > >an issue if we choose to go this route. However, if we actually
> > decide
> > >> to
> > >> > >do this, I would say it is better to do it sooner than later, as
> > fewer
> > >> > >implementations will be affected. Below is excerpt from my previous
> > >> mail.
> > >> > >
> > >> > >Increase scope of KIP-50 to move authorizer and related classes to
> a
> > >> > >separate package. The new package will have java interface. This
> will
> > >> > allow
> > >> > >implementations to not depend on kafka core and just on authorizer
> > >> > package,
> > >> > >make authorization interface follow kafka’s coding standards and
> will
> > >> > allow
> > >> > >java implementations to be cleaner. We can either completely drop
> > scala
> > >> > >interface, which might be a pain for existing implementations, or
> we
> > can
> > >> > >have scala interface wrap java interface. Later allows a cleaner
> > >> > >deprecation path for existing scala authorizer interface, however
> it
> > may
> > >> > or
> > >> > >may not be feasible as Kafka server will have to somehow decide
> which
> > >> > >interface it should be looking for while loading authorizer
> > >> > implementation,
> > >> > >this can probably be solved with a config or some reflection. If we
> > >> choose
> > >> > >to go this route, I can dig deeper.
> > >> > >
> > >> > >If we go with option 2 and commit on getting this in ASAP,
> > 

[jira] [Commented] (KAFKA-3160) Kafka LZ4 framing code miscalculates header checksum

2016-04-10 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234245#comment-15234245
 ] 

Magnus Edenhill commented on KAFKA-3160:


[~dana.powers] My broker patch adds a new Attribute bit to specify the fixed 
LZ4F framing but still relies on clients being backwards compatible with the 
broken framing format, but that was before KIP-31 was a thing..

Your proposed solution with reusing the KIP-31 behaviour is much better, I'd 
definately like to see this in broker 0.10.
This will also formally add LZ4 support to the protocol (it is not even 
mentioned in the Kafka protocol docs) and be compatible with KIP-35 (e.g., 
ProduceRequest >= v2 supports LZ4).

I'm a strong +1 on this.

> Kafka LZ4 framing code miscalculates header checksum
> 
>
> Key: KAFKA-3160
> URL: https://issues.apache.org/jira/browse/KAFKA-3160
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.8.2.2, 0.9.0.1
>Reporter: Dana Powers
>Assignee: Magnus Edenhill
>  Labels: compatibility, compression, lz4
>
> KAFKA-1493 partially implements the LZ4 framing specification, but it 
> incorrectly calculates the header checksum. This causes 
> KafkaLZ4BlockInputStream to raise an error 
> [IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
> LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
> framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
> always receive incorrectly framed data.
> Specifically, the current implementation includes the 4-byte MagicNumber in 
> the checksum, which is incorrect.
> http://cyan4973.github.io/lz4/lz4_Frame_format.html
> Third-party clients that attempt to use off-the-shelf lz4 framing find that 
> brokers reject messages as having a corrupt checksum. So currently non-java 
> clients must 'fixup' lz4 packets to deal with the broken checksum.
> Magnus first identified this issue in librdkafka; kafka-python has the same 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-04-10 Thread Jun Rao
Magnus proposal versions each SASL token. Perhaps you had something
different in mind? I agree that we want to make the part that Kafka
controls in SASL extensible. For now, the mechanism part is the only thing
that we had in mind. Is there anything else you think would be useful? We
could rename SaslMechanismRequest to sth more general
like SaslNegotiationRequest if we anticipate additional info in the
future. One difference between Kafka and Zookeeper is that Zookeeper
chooses to use a single port for both plaintext and SASL. So Zookeeper
client will need to further communicate whether plaintext or SASL is used
for a connection. In Kafka, we have a separate port for SASL.

Thanks,

Jun

On Sun, Apr 10, 2016 at 10:43 AM, Jay Kreps  wrote:

> I don't think we are versioning their bytes but providing a framework for
> any additional data needed (in this case the mechanism at the least is
> needed but it could be more), right? I think this is what the authenticate
> api in zookeeper does, right?
>
> -Jay
>
> On Sun, Apr 10, 2016 at 10:08 AM, Jun Rao  wrote:
>
> > The way that we want to extend SASL mechanism is to first add the
> mechanism
> > exchange part. Once the mechanism is selected, we will exchange the SASL
> > tokens specific to the selected mechanism. Magnus proposes to model both
> > the mechanism exchange and token exchange as independent Kafka
> > request/response. My concern is that modeling the token exchange part as
> a
> > separate request feels weird. Those tokens are provided by the SASL
> > library. Once a mechanism is selected, Kafka doesn't control their
> content.
> > So why would Kafka want to version them? If you look at Magnus's
> proposal,
> > the server essentially ignores the header in the token exchange request
> > since it always expects the same header once a given mechanism is
> > determined.
> >
> > Based on that, I was suggesting an alternative approach to just model the
> > mechanism exchange part as a Kafka request, since this is the only part
> > that Kafka controls. We then just document the authentication protocol to
> > be SaslMechanismRequest followed by standard token exchanges from the
> SASL
> > library based on the agreed upon SASL mechanism. This allows us to extend
> > mechanisms in the future. It is a bit ad hoc. However, the authentication
> > logic doesn't completely fit into the independent Kafka request protocol.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sun, Apr 10, 2016 at 9:02 AM, Jay Kreps  wrote:
> >
> > > I understood Magnus's complaint to be introducing a non-extensible ad
> hoc
> > > protocol. I would second that. The responses seem to be all about how
> the
> > > java code is organized (do we process things in the KafkaApis layer,
> > etc).
> > > These are separate questions, right?
> > >
> > > -Jay
> > >
> > > On Fri, Apr 8, 2016 at 4:19 PM, Jun Rao  wrote:
> > >
> > > > Hi, Magnus,
> > > >
> > > > You brought up a few things in your proposal. I am trying to itemize
> > them
> > > > below so that we can discuss them individually.
> > > >
> > > > 1. You are proposing moving the SASL authentication logic into the
> > > > application and modeling it just as regular requests such as
> > > produce/fetch.
> > > > The issue is that there is dependency between the authentication part
> > and
> > > > regular requests. The client is not expected to send any regular
> > request
> > > > before the authentication completes. If we mix them together, the
> > > > application layer will need to maintain additional states to manage
> > such
> > > > dependency. In the current implementation, authentication is done
> below
> > > the
> > > > application layer. So, when a connection is ready, the application is
> > > free
> > > > to send any regular requests. This simplifies the logic in the
> > > application
> > > > layer.
> > > >
> > > > 2. You are proposing having a single port that supports both
> plaintext
> > > and
> > > > SASL, instead of our current approach of having separate ports.
> Indeed,
> > > > there are other projects like Zookeeper do it that way. I think both
> > > > approaches are possible, but it's not clear to me that one is clearly
> > > > better than the other. Even if you have just a single port, people
> > would
> > > > likely want to have the ability to disable plaintext connections if
> > SASL
> > > is
> > > > enabled. So, you would still need some configuration on the server
> > side.
> > > > This is then not too much different from the multiple port approach.
> > > >
> > > > 3. You are proposing to give a client the ability to support multiple
> > > SASL
> > > > mechanisms and choose one to use dynamically. I am not sure if there
> > is a
> > > > real use case for that. Setting the credential for one authentication
> > > > mechanism can already be tricky. Configuring multiple of those in a
> > > single
> > > > client can cause more confusing. Also, different SASL 

[jira] [Commented] (KAFKA-3160) Kafka LZ4 framing code miscalculates header checksum

2016-04-10 Thread Dana Powers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234221#comment-15234221
 ] 

Dana Powers commented on KAFKA-3160:


Magnus: have you made any progress on this? The more I think about it, the more 
I think this needs to get included w/ KIP-31. If the goal of KIP-31 is to avoid 
recompression, and the goal of this JIRA is to fix the compression format, and 
in all cases we need to maintain compatibility with old clients, then I think 
the only way to solve all conditions is to make the pre-KIP-31 FetchRequest / 
ProduceRequest versions use the broken LZ4 format, and require the fixed format 
in the new FetchRequest / ProduceRequest version:

Old 0.8/0.9 clients (current behavior): produce messages w/ broken checksum; 
consume messages w/ incorrect checksum only
New 0.10 clients (proposed behavior): produce messages in "new KIP-31 format" 
w/ correct checksum; consume messages in "new KIP-31 format" w/ correct 
checksum only

Proposed behavior for 0.10 broker:
 - convert all "old format" messages to "new KIP-31 format" + fix checksum to 
correct value
 - require incoming "new KIP-31 format" messages to have correct checksum, 
otherwise throw error
 - when serving requests for "old format", fixup checksum to be incorrect when 
converting "new KIP-31 format" messages to old format

Thoughts?

> Kafka LZ4 framing code miscalculates header checksum
> 
>
> Key: KAFKA-3160
> URL: https://issues.apache.org/jira/browse/KAFKA-3160
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.8.2.2, 0.9.0.1
>Reporter: Dana Powers
>Assignee: Magnus Edenhill
>  Labels: compatibility, compression, lz4
>
> KAFKA-1493 partially implements the LZ4 framing specification, but it 
> incorrectly calculates the header checksum. This causes 
> KafkaLZ4BlockInputStream to raise an error 
> [IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
> LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
> framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
> always receive incorrectly framed data.
> Specifically, the current implementation includes the 4-byte MagicNumber in 
> the checksum, which is incorrect.
> http://cyan4973.github.io/lz4/lz4_Frame_format.html
> Third-party clients that attempt to use off-the-shelf lz4 framing find that 
> brokers reject messages as having a corrupt checksum. So currently non-java 
> clients must 'fixup' lz4 packets to deal with the broken checksum.
> Magnus first identified this issue in librdkafka; kafka-python has the same 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-4 Metadata Schema

2016-04-10 Thread Jun Rao
Grant,

The limitation with the current MetadataResponse is that if a broker is
down, all replicas on that broker will be missing in the assigned replica
list in the response. Now, imagine that you want to use MetadataRequest to
do a describe of a topic, it's weird that you don't see the full assigned
replica list if some brokers are not available. So, I feel that we should
fix this properly if we want to move the admin tools to use
MetadataRequest. I understand that this takes some extra work. However, we
don't have to rush this into 0.10.0.0 since there is no usage of the new
MetadataResponse anyway in 0.10.0.0, right?

Thanks,

Jun

On Fri, Apr 8, 2016 at 12:55 PM, Grant Henke  wrote:

> Hi Jun,
>
> I am looking at the changes required for the below request:
>
> 5. You will return no error and 4,5,6 as replicas. The response also
> > includes a list of live brokers. So the client can figure out 5 is not
> live
> > directly w/o relying on the error code.
>
>
> The challenge here is that I need to support both version 0 and version 1s
> behavior. This means I would need to pass the version into the Metadata
> cache or create a new getTopicMetadataV0 to call from KafkaApis.
>
> I would also need to modify the MetadataResponse.PartitionMetadata quite a
> bit. Today it expects a Node for every replica, however if the broker is
> not live I don't have a valid host and port to use. Its a bit unfortunate
> that a node object is required, since on the wire only a broker id is used,
> but it exists for client convenience. For the same reason I would need to
> change how MetadataResponse response is decoded, since it assumes the
> broker id for replicas, isr, etc will be in the broker list to translate
> back to a Node. Today it silently drops the entry if the id is not in the
> broker list, which we may not have realized.
>
> Before I start making all the changes to support this, is it worth that
> much change to fix this issue? I understand the metadata response is not
> optimal, but is the current behavior bad enough? Can this be handled in a
> separate PR?
>
> Thanks,
> Grant
>
>
> On Thu, Apr 7, 2016 at 1:07 PM, Grant Henke  wrote:
>
> > Thanks for the feedback Guozhang and Gwen.
> >
> > Gwen, I agree with you on this. I am not sure its something we can/should
> > tackle here. Especially before the release. I can leave the delete flag
> off
> > of the changes.
> >
> > What that means for KIP-4, is that a client won't be able to
> differentiate
> > between a topic that is gone vs marked for deletion. This means a delete
> > and then create action may fail with a topic exists exception...which the
> > user could retry until succeeded. I think that is reasonable, and much
> > safer.
> >
> > After that we can work on creating more tests and improving the delete
> > behavior.
> >
> >
> >
> > On Thu, Apr 7, 2016 at 12:55 PM, Gwen Shapira  wrote:
> >
> >> Given that we are very close to the release, if we are changing the
> >> Metadata cache + topic deletion logic, I'd like a good number of system
> >> tests to appear with the patch.
> >>
> >> On Thu, Apr 7, 2016 at 10:53 AM, Gwen Shapira 
> wrote:
> >>
> >> > This will change some logic though, right?
> >> >
> >> > IIRC, right now produce/fetch requests to marked-for-deletion topics
> >> fail
> >> > because the topics are simple not around. You get a generic "doesn't
> >> exist"
> >> > error. If we keep these topics and add a flag, we'll need to find all
> >> the
> >> > places with this implicit logic and correct for it.
> >> >
> >> > And since our tests for topic deletion are clearly inadequate... I'm a
> >> bit
> >> > scared :)
> >> >
> >> > Gwen
> >> >
> >> > On Thu, Apr 7, 2016 at 10:34 AM, Guozhang Wang 
> >> wrote:
> >> >
> >> >> Hmm, I think since in the original protocol, metadata response do not
> >> have
> >> >> information for "marked for deleted topics" and hence we want to
> remove
> >> >> that topic from returning in response by cleaning the metadata cache
> >> once
> >> >> it is marked to deletion.
> >> >>
> >> >> With the new format, I think it is OK to delay the metadata cleaning.
> >> >>
> >> >> Guozhang
> >> >>
> >> >> On Thu, Apr 7, 2016 at 8:35 AM, Grant Henke 
> >> wrote:
> >> >>
> >> >> > I am testing the marked for deletion flag in the metadata and ran
> >> into
> >> >> some
> >> >> > challenges.
> >> >> >
> >> >> > It turns out that as soon as a topic is marked for deletion it may
> be
> >> >> > purged from the metadata cache. This means that Metadata responses
> >> >> > can't/don't return the topic. Though the topic may still exist if
> its
> >> >> not
> >> >> > ready to be completely deleted or is in the process of being
> deleted.
> >> >> >
> >> >> > This poses a challenge because a user would have no way to tell if
> a
> >> >> topic
> >> >> > still exists, and is marked for deletion, other than to try and
> >> >> 

Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-04-10 Thread Jay Kreps
I don't think we are versioning their bytes but providing a framework for
any additional data needed (in this case the mechanism at the least is
needed but it could be more), right? I think this is what the authenticate
api in zookeeper does, right?

-Jay

On Sun, Apr 10, 2016 at 10:08 AM, Jun Rao  wrote:

> The way that we want to extend SASL mechanism is to first add the mechanism
> exchange part. Once the mechanism is selected, we will exchange the SASL
> tokens specific to the selected mechanism. Magnus proposes to model both
> the mechanism exchange and token exchange as independent Kafka
> request/response. My concern is that modeling the token exchange part as a
> separate request feels weird. Those tokens are provided by the SASL
> library. Once a mechanism is selected, Kafka doesn't control their content.
> So why would Kafka want to version them? If you look at Magnus's proposal,
> the server essentially ignores the header in the token exchange request
> since it always expects the same header once a given mechanism is
> determined.
>
> Based on that, I was suggesting an alternative approach to just model the
> mechanism exchange part as a Kafka request, since this is the only part
> that Kafka controls. We then just document the authentication protocol to
> be SaslMechanismRequest followed by standard token exchanges from the SASL
> library based on the agreed upon SASL mechanism. This allows us to extend
> mechanisms in the future. It is a bit ad hoc. However, the authentication
> logic doesn't completely fit into the independent Kafka request protocol.
>
> Thanks,
>
> Jun
>
> On Sun, Apr 10, 2016 at 9:02 AM, Jay Kreps  wrote:
>
> > I understood Magnus's complaint to be introducing a non-extensible ad hoc
> > protocol. I would second that. The responses seem to be all about how the
> > java code is organized (do we process things in the KafkaApis layer,
> etc).
> > These are separate questions, right?
> >
> > -Jay
> >
> > On Fri, Apr 8, 2016 at 4:19 PM, Jun Rao  wrote:
> >
> > > Hi, Magnus,
> > >
> > > You brought up a few things in your proposal. I am trying to itemize
> them
> > > below so that we can discuss them individually.
> > >
> > > 1. You are proposing moving the SASL authentication logic into the
> > > application and modeling it just as regular requests such as
> > produce/fetch.
> > > The issue is that there is dependency between the authentication part
> and
> > > regular requests. The client is not expected to send any regular
> request
> > > before the authentication completes. If we mix them together, the
> > > application layer will need to maintain additional states to manage
> such
> > > dependency. In the current implementation, authentication is done below
> > the
> > > application layer. So, when a connection is ready, the application is
> > free
> > > to send any regular requests. This simplifies the logic in the
> > application
> > > layer.
> > >
> > > 2. You are proposing having a single port that supports both plaintext
> > and
> > > SASL, instead of our current approach of having separate ports. Indeed,
> > > there are other projects like Zookeeper do it that way. I think both
> > > approaches are possible, but it's not clear to me that one is clearly
> > > better than the other. Even if you have just a single port, people
> would
> > > likely want to have the ability to disable plaintext connections if
> SASL
> > is
> > > enabled. So, you would still need some configuration on the server
> side.
> > > This is then not too much different from the multiple port approach.
> > >
> > > 3. You are proposing to give a client the ability to support multiple
> > SASL
> > > mechanisms and choose one to use dynamically. I am not sure if there
> is a
> > > real use case for that. Setting the credential for one authentication
> > > mechanism can already be tricky. Configuring multiple of those in a
> > single
> > > client can cause more confusing. Also, different SASL mechanisms are
> > > typically associated with different users. It seems unlikely for an
> > > administrator to give the same application two different user ids for
> > > authorization. The reason that the current proposal returns a list of
> > > enabled mechanisms is not for client to make a choice, but rather for
> the
> > > client to know the available options if the client gets an
> > > UnsupportedMechanism exception. Gwen gave an example that a client may
> > want
> > > to switch from one authentication mechanism to another. I am not sure
> if
> > we
> > > need to optimize for the transition phase. A simpler approach is to
> have
> > > the new authentication mechanism enabled and new ACL added on the
> broker
> > > side first. Then enable the new authentication mechanism on the client.
> > >
> > > 4. You are proposing to wrap the tokens from the SASL library in Kafka
> > > request protocol. It feels a bit weird to put a Kafka version on each
> > SASL
> 

[jira] [Updated] (KAFKA-3160) Kafka LZ4 framing code miscalculates header checksum

2016-04-10 Thread Dana Powers (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dana Powers updated KAFKA-3160:
---
Description: 
KAFKA-1493 partially implements the LZ4 framing specification, but it 
incorrectly calculates the header checksum. This causes 
KafkaLZ4BlockInputStream to raise an error 
[IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
always receive incorrectly framed data.

Specifically, the current implementation includes the 4-byte MagicNumber in the 
checksum, which is incorrect.
http://cyan4973.github.io/lz4/lz4_Frame_format.html

Third-party clients that attempt to use off-the-shelf lz4 framing find that 
brokers reject messages as having a corrupt checksum. So currently non-java 
clients must 'fixup' lz4 packets to deal with the broken checksum.

Magnus first identified this issue in librdkafka; kafka-python has the same 
problem.

  was:
KAFKA-1493 partially implements the LZ4 framing specification, but it 
incorrectly calculates the header checksum. This causes 
KafkaLZ4BlockInputStream to raise an error 
[IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
LZ4 data. It also causes the kafka broker to always return incorrectly framed 
LZ4 data to clients.

Specifically, the current implementation includes the 4-byte MagicNumber in the 
checksum, which is incorrect.
http://cyan4973.github.io/lz4/lz4_Frame_format.html

Third-party clients that attempt to use off-the-shelf lz4 framing find that 
brokers reject messages as having a corrupt checksum. So currently non-java 
clients must 'fixup' lz4 packets to deal with the broken checksum.

Magnus first identified this issue in librdkafka; kafka-python has the same 
problem.


> Kafka LZ4 framing code miscalculates header checksum
> 
>
> Key: KAFKA-3160
> URL: https://issues.apache.org/jira/browse/KAFKA-3160
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.8.2.2, 0.9.0.1
>Reporter: Dana Powers
>Assignee: Magnus Edenhill
>  Labels: compatibility, compression, lz4
>
> KAFKA-1493 partially implements the LZ4 framing specification, but it 
> incorrectly calculates the header checksum. This causes 
> KafkaLZ4BlockInputStream to raise an error 
> [IOException(DESCRIPTOR_HASH_MISMATCH)] if a client sends *correctly* framed 
> LZ4 data. It also causes KafkaLZ4BlockOutputStream to generate incorrectly 
> framed LZ4 data, which means clients decoding LZ4 messages from kafka will 
> always receive incorrectly framed data.
> Specifically, the current implementation includes the 4-byte MagicNumber in 
> the checksum, which is incorrect.
> http://cyan4973.github.io/lz4/lz4_Frame_format.html
> Third-party clients that attempt to use off-the-shelf lz4 framing find that 
> brokers reject messages as having a corrupt checksum. So currently non-java 
> clients must 'fixup' lz4 packets to deal with the broken checksum.
> Magnus first identified this issue in librdkafka; kafka-python has the same 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-04-10 Thread Jun Rao
The way that we want to extend SASL mechanism is to first add the mechanism
exchange part. Once the mechanism is selected, we will exchange the SASL
tokens specific to the selected mechanism. Magnus proposes to model both
the mechanism exchange and token exchange as independent Kafka
request/response. My concern is that modeling the token exchange part as a
separate request feels weird. Those tokens are provided by the SASL
library. Once a mechanism is selected, Kafka doesn't control their content.
So why would Kafka want to version them? If you look at Magnus's proposal,
the server essentially ignores the header in the token exchange request
since it always expects the same header once a given mechanism is
determined.

Based on that, I was suggesting an alternative approach to just model the
mechanism exchange part as a Kafka request, since this is the only part
that Kafka controls. We then just document the authentication protocol to
be SaslMechanismRequest followed by standard token exchanges from the SASL
library based on the agreed upon SASL mechanism. This allows us to extend
mechanisms in the future. It is a bit ad hoc. However, the authentication
logic doesn't completely fit into the independent Kafka request protocol.

Thanks,

Jun

On Sun, Apr 10, 2016 at 9:02 AM, Jay Kreps  wrote:

> I understood Magnus's complaint to be introducing a non-extensible ad hoc
> protocol. I would second that. The responses seem to be all about how the
> java code is organized (do we process things in the KafkaApis layer, etc).
> These are separate questions, right?
>
> -Jay
>
> On Fri, Apr 8, 2016 at 4:19 PM, Jun Rao  wrote:
>
> > Hi, Magnus,
> >
> > You brought up a few things in your proposal. I am trying to itemize them
> > below so that we can discuss them individually.
> >
> > 1. You are proposing moving the SASL authentication logic into the
> > application and modeling it just as regular requests such as
> produce/fetch.
> > The issue is that there is dependency between the authentication part and
> > regular requests. The client is not expected to send any regular request
> > before the authentication completes. If we mix them together, the
> > application layer will need to maintain additional states to manage such
> > dependency. In the current implementation, authentication is done below
> the
> > application layer. So, when a connection is ready, the application is
> free
> > to send any regular requests. This simplifies the logic in the
> application
> > layer.
> >
> > 2. You are proposing having a single port that supports both plaintext
> and
> > SASL, instead of our current approach of having separate ports. Indeed,
> > there are other projects like Zookeeper do it that way. I think both
> > approaches are possible, but it's not clear to me that one is clearly
> > better than the other. Even if you have just a single port, people would
> > likely want to have the ability to disable plaintext connections if SASL
> is
> > enabled. So, you would still need some configuration on the server side.
> > This is then not too much different from the multiple port approach.
> >
> > 3. You are proposing to give a client the ability to support multiple
> SASL
> > mechanisms and choose one to use dynamically. I am not sure if there is a
> > real use case for that. Setting the credential for one authentication
> > mechanism can already be tricky. Configuring multiple of those in a
> single
> > client can cause more confusing. Also, different SASL mechanisms are
> > typically associated with different users. It seems unlikely for an
> > administrator to give the same application two different user ids for
> > authorization. The reason that the current proposal returns a list of
> > enabled mechanisms is not for client to make a choice, but rather for the
> > client to know the available options if the client gets an
> > UnsupportedMechanism exception. Gwen gave an example that a client may
> want
> > to switch from one authentication mechanism to another. I am not sure if
> we
> > need to optimize for the transition phase. A simpler approach is to have
> > the new authentication mechanism enabled and new ACL added on the broker
> > side first. Then enable the new authentication mechanism on the client.
> >
> > 4. You are proposing to wrap the tokens from the SASL library in Kafka
> > request protocol. It feels a bit weird to put a Kafka version on each
> SASL
> > token. Those tokens are generated by the SASL library, which is
> responsible
> > for maintaining the compatibility.
> >
> > 5. Your main request is how can a client know that the broker is now
> > supporting new SASL mechanisms. One way to support that is to adjust
> KIP-43
> > slightly. We can model the SaslMechanismRequest as a regular request
> (with
> > standard request header) and add that to our protocol definition.
> Version 0
> > of this request indicates that it supports GSSAPI and SASL Plain. If we
> >