Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2018-01-09 Thread Dong Lin
Regarding the use of the global epoch in 65), it is very similar to the
proposal of the metadata_epoch we discussed earlier. The main difference is
that this epoch is incremented when we create/expand/delete topic and does
not change when controller re-send metadata.

I looked at our previous discussion. It seems that we prefer
partition_epoch over the metadata_epoch because 1) we prefer not to have an
ever growing metadata_epoch and 2) we can reset offset better when topic is
re-created. The use of global topic_epoch avoids the drawback of an ever
quickly ever growing metadata_epoch. Though the global epoch does not allow
us to recognize the invalid offset committed before the topic re-creation,
we can probably just delete the offset when we delete a topic. Thus I am
not very sure whether it is still worthwhile to have a per-partition
partition_epoch if the metadata already has the global epoch.


On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks so much. These comments very useful. Please see below my comments.
>
> On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> Thanks for the updated KIP. A few more comments.
>>
>> 60. Perhaps having a partition epoch is more flexible since in the future,
>> we may support deleting a partition as well.
>>
>
> Yeah I have considered this. I think we can probably still support
> deleting a partition by using the topic_epoch -- when partition of a topic
> is deleted or created, epoch of all partitions of this topic will be
> incremented by 1. Therefore, if that partition is re-created later, the
> epoch of that partition will still be larger than its epoch before the
> deletion, which still allows the client to order the metadata for the
> purpose of this KIP. Does this sound reasonable?
>
> The advantage of using topic_epoch instead of partition_epoch is that the
> size of the /brokers/topics/[topic] znode and request/response size can be
> smaller. We have a limit on the maximum size of znode (typically 1MB). Use
> partition epoch can effectively reduce the number of partitions that can be
> described by the /brokers/topics/[topic] znode.
>
> One use-case of partition_epoch for client to detect that the committed
> offset, either from kafka offset topic or from the external store is
> invalid after partition deletion and re-creation. However, it seems that we
> can also address this use-case with other approaches. For example, when
> AdminClient deletes partitions, it can also delete the committed offsets
> for those partitions from the offset topic. If user stores offset
> externally, it might make sense for user to similarly remove offsets of
> related partitions after these partitions are deleted. So I am not sure
> that we should use partition_epoch in this KIP.
>
>
>>
>> 61. It seems that the leader epoch returned in the position() call should
>> the the leader epoch returned in the fetch response, not the one in the
>> metadata cache of the client.
>
>
> I think this is a good idea. Just to double check, this change does not
> affect the correctness or performance of this KIP. But it can be useful if
> we want to use the leader_epoch to better handle the offset rest in case of
> unclean leader election, which is listed in the future work. Is this
> understanding correct?
>
> I have updated the KIP to specify that the leader_epoch returned by
> position() should be the largest leader_epoch of those already consumed
> messages whose offset < position. If no message has been consumed since
> consumer initialization, the leader_epoch from seek() or
> OffsetFetchResponse should be used. The offset included in the
> OffsetCommitRequest will also be determined in the similar manner.
>
>
>>
>> 62. I am wondering if we should return the partition epoch in the fetch
>> response as well. In the current proposal, if a topic is recreated and the
>> new leader is on the same broker as the old one, there is nothing to force
>> the metadata refresh in the client. So, the client may still associate the
>> offset with the old partition epoch.
>>
>
> Could you help me understand the problem if a client associates old
> partition_epoch (or the topic_epoch as of the current KIP) with the offset?
> The main purpose of the topic_epoch is to be able to drop leader_epoch to 0
> after a partition is deleted and re-created. I guess you may be thinking
> about using the partition_epoch to detect that the committed offset is
> invalid? In that case, I am wondering if the alternative approach described
> in 60) would be reasonable.
>
>
>>
>> 63. There is some subtle coordination between the LeaderAndIsrRequest and
>> UpdateMetadataRequest. Currently, when a leader changes, the controller
>> first sends the LeaderAndIsrRequest to the assigned replicas and the
>> UpdateMetadataRequest to every broker. So, there could be a small window
>> when the leader already receives the new partition epoch in the
>> 

Re: [DISCUSS] KIP-212: Enforce set of legal characters for connector names

2018-01-09 Thread Ewen Cheslack-Postava
great point, I'm always for exclusions where they make sense. i just prefer
to include by default w/ exclusions when necessary to listing explicit
inclusions and being restrictive. (and security updates immediately as
needed).

If you have a set of characters you think we should exclude, I think it
would be good to add them here or in a subsequent KIP!

-Ewen

On Tue, Jan 9, 2018 at 1:30 PM, Colin McCabe  wrote:

> On Sat, Jan 6, 2018, at 16:00, Ewen Cheslack-Postava wrote:
> > re: whitespace characters, I'm fine with the restriction since I don't
> see
> > it becoming an issue in practice. I just don't see any reason to restrict
> > it so it seems like we're going out of our way and doing extra work to be
> > restrictive, but without clear motivation.
>
> There are very good reasons not to support control characters in file
> names, topic names, log files, etc.
>
> See http://seclists.org/fulldisclosure/2003/Feb/att-341/Termulation.txt
>
> There are a bunch of CVEs about this, too.  Because of the (in my opinion,
> mistaken) decision to allow control characters in UNIX filenames, even
> echoing a file name to your terminal is a security vulnerability.
>
> best,
> Colin
>
>
> >
> > In general my default approach (without context of a specific system)
> would
> > be to accept anything that we can encode in UTF-8 and only apply
> > restrictions where it becomes necessary (e.g. we need to define a
> delimiter
> > for some reason). The constraints of URLs introduce some complexity (you
> > need escaping), but probably generally still allow this. If I can use an
> > emoji when naming things, then I'm probably happy :) Whitespace
> characters
> > definitely have some other issues (e.g. you can have non-visible
> whitespace
> > which obscures which connector you're actually working with), but despite
> > the JIRA linked, I wasn't really convinced they need special handling. It
> > seems like a really weird issue to encounter in the first place.
> >
> > -Ewen
> >
> > On Fri, Jan 5, 2018 at 8:10 AM, Randall Hauch  wrote:
> >
> > > Sönke, I'm happy with the current proposal.
> > >
> > > Ewen, the proposal allows any characters in the name as long as they
> are
> > > properly escaped/encoded. That seems to adhere to the robustness
> principle.
> > > The only exception is that the proposal trims leading and trailing
> > > whitespace characters in an attempt to reduce user errors. Can you
> please
> > > clarify that you're okay with this behavior? I agree that technically
> we
> > > can (and currently do) support whitespace-only names, but users have
> > > reported this as problematic, and it also would be confusing for most
> user
> > > interfaces.
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Thu, Jan 4, 2018 at 10:31 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > > wrote:
> > >
> > > > Very late to the game here, but a few thoughts:
> > > >
> > > > 1. Regarding whether KIP is necessary, I don't mind doing it for
> > > > documentation sake, but I would classify any mishandling of connector
> > > names
> > > > here as a bug. Which doesn't require a KIP to fix.
> > > >
> > > > 2. For support of characters, Kafka has some history of just being
> > > > restrictive (e.g., see topic name restrictions), but I personally
> > > disagree
> > > > with this approach. I think it is better to be liberal in what we
> accept
> > > > and just document limitations. I think our default should be to
> accept
> > > any
> > > > user input and document why we can't handle certain inputs and how
> the
> > > user
> > > > should adapt if we can't. In general I try to work under the
> robustness
> > > > principle: *Be conservative in what you do, be liberal in what you
> accept
> > > > from others*
> > > >
> > > > 3. Related to 2, there were some cases like whitespace-only connector
> > > > names. This seems extremely weird and not critical, so I'm fine not
> > > > supporting it officially, but technically I don't see any reason it
> > > > shouldn't be supported with any appropriate escaping (i.e. what
> would it
> > > > break for us?).
> > > >
> > > > But in general, I think just being more explicit about expectations
> is
> > > > great and it'd be great to set baseline expectations.
> > > >
> > > > -Ewen
> > > >
> > > >
> > > >
> > > > On Mon, Nov 20, 2017 at 12:33 AM, Sönke Liebau <
> > > > soenke.lie...@opencore.com.invalid> wrote:
> > > >
> > > > > @Randall: are you happy with the KIP as it stands so I can call
> for a
> > > > vote,
> > > > > or are there any outstanding items still to discuss?
> > > > >
> > > > > Same question to anybody else who'd like to participate of course
> :)
> > > > >
> > > > > On Thu, Nov 16, 2017 at 5:35 PM, Sönke Liebau <
> > > > soenke.lie...@opencore.com>
> > > > > wrote:
> > > > >
> > > > > > Sounds good. I've added a few sentences to this effect to the
> KIP.
> > > > > >
> > > > > > On Thu, Nov 16, 2017 at 5:02 PM, Randall Hauch 

[jira] [Created] (KAFKA-6436) Provide a metric indicating broker cluster membership state

2018-01-09 Thread Lukasz Mierzwa (JIRA)
Lukasz Mierzwa created KAFKA-6436:
-

 Summary: Provide a metric indicating broker cluster membership 
state
 Key: KAFKA-6436
 URL: https://issues.apache.org/jira/browse/KAFKA-6436
 Project: Kafka
  Issue Type: Wish
  Components: metrics
Reporter: Lukasz Mierzwa
Priority: Minor


When deploying kafka config changes each instance needs to be restarted (since 
there's no graceful reload) and that requires coordination to keep all 
partitions on-line. Part of the automation I have waits after restarting each 
instance until restarted broker is back in sync on all partitions, to do that I 
query for:

{noformat}
kafka.server:name=BrokerState,type=KafkaServer to be 3 (broker is up & running)
kafka.server:clientId=Replica,name=MaxLag,type=ReplicaFetcherManager = 0 
(there's no lag)
{noformat}

I've noticed that there's a race for the MaxLag metric - when replica fetcher 
threads are starting this metric will be initialized with 0 value, then (I 
assume) once all threads connect to the leaders it's populated with "correct" 
MaxLag value computed from all those threads. This means that there's a window 
where I can query for those metrics and get expected BrokerState=3 and MaxLag=0 
which would I interpret as "done restarting this instance" but a few seconds 
later MaxLag might jump to a huge value.
Right now my workaround is to require multiple queries to return expected 
metric values, which seems to protect me from hitting that window.
It would be nice if there was a metric like "ClusterState" initialized as 0 
that would be set to 1 only once all replica fetcher threads are started, 
completed reconnecting to the leaders and proper MaxLag is set (or there's no 
replicas on given broker).
Alternatively MaxLag could be just initialized with -1 and set to 0 later if 
that's the actual max lag computed after getting replication offsets from 
leaders (if that would work).

If there was a "ClusterState" metric it could also be used to signal if a 
broker loses connectivity with the rest of the cluster, I don't there is such 
metric right now (is there?).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to normal : kafka-trunk-jdk8 #2320

2018-01-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2018-01-09 Thread Dong Lin
Hey Jun,

Thanks so much. These comments very useful. Please see below my comments.

On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. A few more comments.
>
> 60. Perhaps having a partition epoch is more flexible since in the future,
> we may support deleting a partition as well.
>

Yeah I have considered this. I think we can probably still support deleting
a partition by using the topic_epoch -- when partition of a topic is
deleted or created, epoch of all partitions of this topic will be
incremented by 1. Therefore, if that partition is re-created later, the
epoch of that partition will still be larger than its epoch before the
deletion, which still allows the client to order the metadata for the
purpose of this KIP. Does this sound reasonable?

The advantage of using topic_epoch instead of partition_epoch is that the
size of the /brokers/topics/[topic] znode and request/response size can be
smaller. We have a limit on the maximum size of znode (typically 1MB). Use
partition epoch can effectively reduce the number of partitions that can be
described by the /brokers/topics/[topic] znode.

One use-case of partition_epoch for client to detect that the committed
offset, either from kafka offset topic or from the external store is
invalid after partition deletion and re-creation. However, it seems that we
can also address this use-case with other approaches. For example, when
AdminClient deletes partitions, it can also delete the committed offsets
for those partitions from the offset topic. If user stores offset
externally, it might make sense for user to similarly remove offsets of
related partitions after these partitions are deleted. So I am not sure
that we should use partition_epoch in this KIP.


>
> 61. It seems that the leader epoch returned in the position() call should
> the the leader epoch returned in the fetch response, not the one in the
> metadata cache of the client.


I think this is a good idea. Just to double check, this change does not
affect the correctness or performance of this KIP. But it can be useful if
we want to use the leader_epoch to better handle the offset rest in case of
unclean leader election, which is listed in the future work. Is this
understanding correct?

I have updated the KIP to specify that the leader_epoch returned by
position() should be the largest leader_epoch of those already consumed
messages whose offset < position. If no message has been consumed since
consumer initialization, the leader_epoch from seek() or
OffsetFetchResponse should be used. The offset included in the
OffsetCommitRequest will also be determined in the similar manner.


>
> 62. I am wondering if we should return the partition epoch in the fetch
> response as well. In the current proposal, if a topic is recreated and the
> new leader is on the same broker as the old one, there is nothing to force
> the metadata refresh in the client. So, the client may still associate the
> offset with the old partition epoch.
>

Could you help me understand the problem if a client associates old
partition_epoch (or the topic_epoch as of the current KIP) with the offset?
The main purpose of the topic_epoch is to be able to drop leader_epoch to 0
after a partition is deleted and re-created. I guess you may be thinking
about using the partition_epoch to detect that the committed offset is
invalid? In that case, I am wondering if the alternative approach described
in 60) would be reasonable.


>
> 63. There is some subtle coordination between the LeaderAndIsrRequest and
> UpdateMetadataRequest. Currently, when a leader changes, the controller
> first sends the LeaderAndIsrRequest to the assigned replicas and the
> UpdateMetadataRequest to every broker. So, there could be a small window
> when the leader already receives the new partition epoch in the
> LeaderAndIsrRequest, but the metadata cache in the broker hasn't been
> updated with the latest partition epoch. Not sure what's the best way to
> address this issue. Perhaps we can update the metadata cache on the broker
> with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge is
> that the two have slightly different data. For example, only the latter has
> all endpoints.
>

I am not sure whether this is a problem. Could you explain a bit more what
specific problem this small window can cause?

Since client can fetch metadata from any broker in the cluster, and given
that different brokers receive request (e.g. LeaderAndIsrRequest and
UpdateMetadataRequest) in arbitrary order, the metadata received by client
can be in arbitrary order (either newer or older) compared to the broker's
leadership state even if a given broker receives LeaderAndIsrRequest and
UpdateMetadataRequest simultaneously. So I am not sure it is useful to
update broker's cache with LeaderAndIsrRequest.


> 64. The enforcement of leader epoch in Offset commit: We allow a consumer
> to set an arbitrary offset. So it's 

Build failed in Jenkins: kafka-trunk-jdk7 #3086

2018-01-09 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Menu updates and navigation (#4405)

--
[...truncated 399.38 KB...]
kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow PASSED


[jira] [Created] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics

2018-01-09 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6435:
--

 Summary: Application Reset Tool might delete incorrect internal 
topics
 Key: KAFKA-6435
 URL: https://issues.apache.org/jira/browse/KAFKA-6435
 Project: Kafka
  Issue Type: Bug
  Components: streams, tools
Affects Versions: 1.0.0
Reporter: Matthias J. Sax


The streams application reset tool, deletes all topic that start with 
{{-}}.

If people have two versions of the same application and name them {{"app"}} and 
{{"app-v2"}}, resetting {{"app"}} would also delete the internal topics of 
{{"app-v2"}}.

We either need to disallow the dash in the application ID, or improve the topic 
identification logic in the reset tool to fix this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: kafka-trunk-jdk8 #2319

2018-01-09 Thread Apache Jenkins Server
See 


Changes:

[junrao] KAFKA-4247; Prevent CLASSPATH from beginning with a single colon

--
[...truncated 401.78 KB...]

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithInvalidReplication 
STARTED

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithInvalidReplication 
PASSED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig STARTED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig PASSED

kafka.integration.FetcherTest > testFetcher STARTED

kafka.integration.FetcherTest > testFetcher PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.zk.KafkaZkClientTest > testBrokerRegistrationMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerRegistrationMethods PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath 

Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-09 Thread Colin McCabe
Thanks, Rajini.  That makes sense.

regards,
Colin

On Tue, Jan 9, 2018, at 14:38, Rajini Sivaram wrote:
> Hi Colin,
> 
> Thank you for reviewing.
> 
> Yes, validation is done on the broker, not the client.
> 
> All configs from ZooKeeper are processed and any config that could not be
> applied are logged as warnings. This includes any configs that are not
> dynamic in the broker version or any configs that are not supported in the
> broker version. If you downgrade to a version that is older than this KIP
> (1.0 for example), then you don't get any warnings however.
> 
> 
> On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe  wrote:
> 
> > On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > > Hi Rajini,
> > >
> > > Looking good. Just a few questions.
> > >
> > > 1. (Related to Jay's comment) Is the validate() method on Reconfigurable
> > > necessary? I would have thought we'd validate using the ConfigDef. Do you
> > > have a use case in mind in which the reconfigurable component only
> > permits
> > > certain reconfigurations?
> >
> > Hi,
> >
> > Sorry if this is a dumb question, but when we talk about validating on the
> > ConfigDef, we're talking about validating on the server side, right?  The
> > software on the client side might be older or newer than the software on
> > the broker side, so it seems inadvisable to do the validation there.
> >
> > Also, after a software downgrade, when the broker is restarted, it might
> > find that there is a configuration key that is stored in ZK that is not
> > dynamic in its (older) software version.  It seems like, with the current
> > proposal, the broker will use the value found in the local configuration
> > (config file) rather than the new ZK version.  Should the broker print out
> > a WARN message in that scenario?
> >
> > best,
> > Colin
> >
> > > 2. Should Reconfigurable extend Configurable or is the initial
> > > configuration also done through reconfigure()? I ask because not all
> > > plugins interfaces currently extend Configurable (e.g.
> > > KafkaPrincipalBuilder).
> > > 3. You mentioned a couple changes to DescribeConfigsOptions and
> > > DescribeConfigsResult. Perhaps we should list the changes explicitly? One
> > > not totally obvious case is what the synonyms() getter would return if
> > the
> > > option is not specified (i.e. should it raise an exception or return an
> > > empty list?).
> > > 4. Config entries in the DescribeConfigs response have an is_default
> > flag.
> > > Could that be replaced with the more general config_source?
> > > 5. Bit of an internal question, but how do you handle config
> > dependencies?
> > > For example, suppose I want to add a listener and configure its principal
> > > builder at once. You'd have to validate the principal builder config in
> > the
> > > context of the listener config, so I guess the order of the entries in
> > > AlterConfigs is significant?
> > > 6. KIP-48 (delegation tokens) gives us a master secret which is shared by
> > > all brokers. Do you think we would make this dynamically configurable?
> > > Alternatively, it might be possible to use it to encrypt the other
> > > passwords we store in zookeeper.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Mon, Dec 18, 2017 at 10:16 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > Thank you for reviewing the KIP.
> > > >
> > > > 1) Yes, makes sense. I will update the PR. There are some config
> > updates
> > > > that may be allowed depending on the context (e.g. some security
> > configs
> > > > can be updated for new listeners, but not existing listeners). Perhaps
> > it
> > > > is ok to mark them dynamic in the documentation. AdminClient would give
> > > > appropriate error messages if the update is not allowed.
> > > > 2) Internally, in the implementation, a mixture of direct config
> > updates
> > > > (e.g log config as you have pointed out) and reconfigure method
> > invocations
> > > > (e.g. SslFactory) are used. For configurable plugins (e.g. metrics
> > > > reporter), we require the Reconfigurable interface to ensure that we
> > can
> > > > validate any custom configs and avoid reconfiguration for plugin
> > versions
> > > > that don't support it.
> > > >
> > > >
> > > > On Mon, Dec 18, 2017 at 5:49 PM, Jay Kreps  wrote:
> > > >
> > > > > Two thoughts on implementation (shouldn't effect the KIP):
> > > > >
> > > > >1. It might be nice to add a parameter to ConfigDef which says
> > > > whether a
> > > > >configuration is dynamically updatable or not so that we can give
> > > > error
> > > > >messages if it isn't and also have it reflected in the
> > auto-generated
> > > > > docs.
> > > > >2. For many systems they don't really need to take action if a
> > config
> > > > >changes, they just need to use the new value. Changing them all to
> > > > >Reconfigurable requires managing a fair amount of mutability in
> > each
> 

Build failed in Jenkins: kafka-trunk-jdk7 #3085

2018-01-09 Thread Apache Jenkins Server
See 


Changes:

[junrao] KAFKA-4247; Prevent CLASSPATH from beginning with a single colon

--
[...truncated 399.51 KB...]

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow PASSED


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-09 Thread Rajini Sivaram
Hi Colin,

Thank you for reviewing.

Yes, validation is done on the broker, not the client.

All configs from ZooKeeper are processed and any config that could not be
applied are logged as warnings. This includes any configs that are not
dynamic in the broker version or any configs that are not supported in the
broker version. If you downgrade to a version that is older than this KIP
(1.0 for example), then you don't get any warnings however.


On Tue, Jan 9, 2018 at 9:38 PM, Colin McCabe  wrote:

> On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> > Hi Rajini,
> >
> > Looking good. Just a few questions.
> >
> > 1. (Related to Jay's comment) Is the validate() method on Reconfigurable
> > necessary? I would have thought we'd validate using the ConfigDef. Do you
> > have a use case in mind in which the reconfigurable component only
> permits
> > certain reconfigurations?
>
> Hi,
>
> Sorry if this is a dumb question, but when we talk about validating on the
> ConfigDef, we're talking about validating on the server side, right?  The
> software on the client side might be older or newer than the software on
> the broker side, so it seems inadvisable to do the validation there.
>
> Also, after a software downgrade, when the broker is restarted, it might
> find that there is a configuration key that is stored in ZK that is not
> dynamic in its (older) software version.  It seems like, with the current
> proposal, the broker will use the value found in the local configuration
> (config file) rather than the new ZK version.  Should the broker print out
> a WARN message in that scenario?
>
> best,
> Colin
>
> > 2. Should Reconfigurable extend Configurable or is the initial
> > configuration also done through reconfigure()? I ask because not all
> > plugins interfaces currently extend Configurable (e.g.
> > KafkaPrincipalBuilder).
> > 3. You mentioned a couple changes to DescribeConfigsOptions and
> > DescribeConfigsResult. Perhaps we should list the changes explicitly? One
> > not totally obvious case is what the synonyms() getter would return if
> the
> > option is not specified (i.e. should it raise an exception or return an
> > empty list?).
> > 4. Config entries in the DescribeConfigs response have an is_default
> flag.
> > Could that be replaced with the more general config_source?
> > 5. Bit of an internal question, but how do you handle config
> dependencies?
> > For example, suppose I want to add a listener and configure its principal
> > builder at once. You'd have to validate the principal builder config in
> the
> > context of the listener config, so I guess the order of the entries in
> > AlterConfigs is significant?
> > 6. KIP-48 (delegation tokens) gives us a master secret which is shared by
> > all brokers. Do you think we would make this dynamically configurable?
> > Alternatively, it might be possible to use it to encrypt the other
> > passwords we store in zookeeper.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Mon, Dec 18, 2017 at 10:16 AM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> > wrote:
> >
> > > Hi Jay,
> > >
> > > Thank you for reviewing the KIP.
> > >
> > > 1) Yes, makes sense. I will update the PR. There are some config
> updates
> > > that may be allowed depending on the context (e.g. some security
> configs
> > > can be updated for new listeners, but not existing listeners). Perhaps
> it
> > > is ok to mark them dynamic in the documentation. AdminClient would give
> > > appropriate error messages if the update is not allowed.
> > > 2) Internally, in the implementation, a mixture of direct config
> updates
> > > (e.g log config as you have pointed out) and reconfigure method
> invocations
> > > (e.g. SslFactory) are used. For configurable plugins (e.g. metrics
> > > reporter), we require the Reconfigurable interface to ensure that we
> can
> > > validate any custom configs and avoid reconfiguration for plugin
> versions
> > > that don't support it.
> > >
> > >
> > > On Mon, Dec 18, 2017 at 5:49 PM, Jay Kreps  wrote:
> > >
> > > > Two thoughts on implementation (shouldn't effect the KIP):
> > > >
> > > >1. It might be nice to add a parameter to ConfigDef which says
> > > whether a
> > > >configuration is dynamically updatable or not so that we can give
> > > error
> > > >messages if it isn't and also have it reflected in the
> auto-generated
> > > > docs.
> > > >2. For many systems they don't really need to take action if a
> config
> > > >changes, they just need to use the new value. Changing them all to
> > > >Reconfigurable requires managing a fair amount of mutability in
> each
> > > > class
> > > >that accepts changes. Some need this since they need to take
> actions
> > > > when a
> > > >config changes, but it seems like many just need to update some
> value.
> > > > For
> > > >the later you might just be able to do something like what we do
> for
> > > >LogConfig where there is a single 

Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-09 Thread Colin McCabe
On Mon, Dec 18, 2017, at 13:40, Jason Gustafson wrote:
> Hi Rajini,
> 
> Looking good. Just a few questions.
> 
> 1. (Related to Jay's comment) Is the validate() method on Reconfigurable
> necessary? I would have thought we'd validate using the ConfigDef. Do you
> have a use case in mind in which the reconfigurable component only permits
> certain reconfigurations?

Hi,

Sorry if this is a dumb question, but when we talk about validating on the 
ConfigDef, we're talking about validating on the server side, right?  The 
software on the client side might be older or newer than the software on the 
broker side, so it seems inadvisable to do the validation there.

Also, after a software downgrade, when the broker is restarted, it might find 
that there is a configuration key that is stored in ZK that is not dynamic in 
its (older) software version.  It seems like, with the current proposal, the 
broker will use the value found in the local configuration (config file) rather 
than the new ZK version.  Should the broker print out a WARN message in that 
scenario?

best,
Colin

> 2. Should Reconfigurable extend Configurable or is the initial
> configuration also done through reconfigure()? I ask because not all
> plugins interfaces currently extend Configurable (e.g.
> KafkaPrincipalBuilder).
> 3. You mentioned a couple changes to DescribeConfigsOptions and
> DescribeConfigsResult. Perhaps we should list the changes explicitly? One
> not totally obvious case is what the synonyms() getter would return if the
> option is not specified (i.e. should it raise an exception or return an
> empty list?).
> 4. Config entries in the DescribeConfigs response have an is_default flag.
> Could that be replaced with the more general config_source?
> 5. Bit of an internal question, but how do you handle config dependencies?
> For example, suppose I want to add a listener and configure its principal
> builder at once. You'd have to validate the principal builder config in the
> context of the listener config, so I guess the order of the entries in
> AlterConfigs is significant?
> 6. KIP-48 (delegation tokens) gives us a master secret which is shared by
> all brokers. Do you think we would make this dynamically configurable?
> Alternatively, it might be possible to use it to encrypt the other
> passwords we store in zookeeper.
> 
> Thanks,
> Jason
> 
> 
> 
> On Mon, Dec 18, 2017 at 10:16 AM, Rajini Sivaram 
> wrote:
> 
> > Hi Jay,
> >
> > Thank you for reviewing the KIP.
> >
> > 1) Yes, makes sense. I will update the PR. There are some config updates
> > that may be allowed depending on the context (e.g. some security configs
> > can be updated for new listeners, but not existing listeners). Perhaps it
> > is ok to mark them dynamic in the documentation. AdminClient would give
> > appropriate error messages if the update is not allowed.
> > 2) Internally, in the implementation, a mixture of direct config updates
> > (e.g log config as you have pointed out) and reconfigure method invocations
> > (e.g. SslFactory) are used. For configurable plugins (e.g. metrics
> > reporter), we require the Reconfigurable interface to ensure that we can
> > validate any custom configs and avoid reconfiguration for plugin versions
> > that don't support it.
> >
> >
> > On Mon, Dec 18, 2017 at 5:49 PM, Jay Kreps  wrote:
> >
> > > Two thoughts on implementation (shouldn't effect the KIP):
> > >
> > >1. It might be nice to add a parameter to ConfigDef which says
> > whether a
> > >configuration is dynamically updatable or not so that we can give
> > error
> > >messages if it isn't and also have it reflected in the auto-generated
> > > docs.
> > >2. For many systems they don't really need to take action if a config
> > >changes, they just need to use the new value. Changing them all to
> > >Reconfigurable requires managing a fair amount of mutability in each
> > > class
> > >that accepts changes. Some need this since they need to take actions
> > > when a
> > >config changes, but it seems like many just need to update some value.
> > > For
> > >the later you might just be able to do something like what we do for
> > >LogConfig where there is a single CurrentConfig instance that has a
> > >reference to the current KafkaConfig and always reference your
> > > configurable
> > >parameters via this (e.g. config.current.myConfig). Dunno if that is
> > >actually better, but thought I'd throw it out there.
> > >
> > > -Jay
> > >
> > > On Sun, Dec 10, 2017 at 8:09 AM, Rajini Sivaram  > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thank you!
> > > >
> > > > 5. Yes, that makes sense. Agree that we don't want to add protocol
> > > changes
> > > > to *UpdateMetadataRequest* in this KIP. I have moved the update of
> > > > *log.message.format.version* and *inter.broker.protocol.version* to
> > > reduce
> > > > restarts during upgrade to* 

Re: [DISCUSS] KIP-212: Enforce set of legal characters for connector names

2018-01-09 Thread Colin McCabe
On Sat, Jan 6, 2018, at 16:00, Ewen Cheslack-Postava wrote:
> re: whitespace characters, I'm fine with the restriction since I don't see
> it becoming an issue in practice. I just don't see any reason to restrict
> it so it seems like we're going out of our way and doing extra work to be
> restrictive, but without clear motivation.

There are very good reasons not to support control characters in file names, 
topic names, log files, etc.

See http://seclists.org/fulldisclosure/2003/Feb/att-341/Termulation.txt

There are a bunch of CVEs about this, too.  Because of the (in my opinion, 
mistaken) decision to allow control characters in UNIX filenames, even echoing 
a file name to your terminal is a security vulnerability.

best,
Colin


> 
> In general my default approach (without context of a specific system) would
> be to accept anything that we can encode in UTF-8 and only apply
> restrictions where it becomes necessary (e.g. we need to define a delimiter
> for some reason). The constraints of URLs introduce some complexity (you
> need escaping), but probably generally still allow this. If I can use an
> emoji when naming things, then I'm probably happy :) Whitespace characters
> definitely have some other issues (e.g. you can have non-visible whitespace
> which obscures which connector you're actually working with), but despite
> the JIRA linked, I wasn't really convinced they need special handling. It
> seems like a really weird issue to encounter in the first place.
> 
> -Ewen
> 
> On Fri, Jan 5, 2018 at 8:10 AM, Randall Hauch  wrote:
> 
> > Sönke, I'm happy with the current proposal.
> >
> > Ewen, the proposal allows any characters in the name as long as they are
> > properly escaped/encoded. That seems to adhere to the robustness principle.
> > The only exception is that the proposal trims leading and trailing
> > whitespace characters in an attempt to reduce user errors. Can you please
> > clarify that you're okay with this behavior? I agree that technically we
> > can (and currently do) support whitespace-only names, but users have
> > reported this as problematic, and it also would be confusing for most user
> > interfaces.
> >
> > Best regards,
> >
> > Randall
> >
> > On Thu, Jan 4, 2018 at 10:31 PM, Ewen Cheslack-Postava 
> > wrote:
> >
> > > Very late to the game here, but a few thoughts:
> > >
> > > 1. Regarding whether KIP is necessary, I don't mind doing it for
> > > documentation sake, but I would classify any mishandling of connector
> > names
> > > here as a bug. Which doesn't require a KIP to fix.
> > >
> > > 2. For support of characters, Kafka has some history of just being
> > > restrictive (e.g., see topic name restrictions), but I personally
> > disagree
> > > with this approach. I think it is better to be liberal in what we accept
> > > and just document limitations. I think our default should be to accept
> > any
> > > user input and document why we can't handle certain inputs and how the
> > user
> > > should adapt if we can't. In general I try to work under the robustness
> > > principle: *Be conservative in what you do, be liberal in what you accept
> > > from others*
> > >
> > > 3. Related to 2, there were some cases like whitespace-only connector
> > > names. This seems extremely weird and not critical, so I'm fine not
> > > supporting it officially, but technically I don't see any reason it
> > > shouldn't be supported with any appropriate escaping (i.e. what would it
> > > break for us?).
> > >
> > > But in general, I think just being more explicit about expectations is
> > > great and it'd be great to set baseline expectations.
> > >
> > > -Ewen
> > >
> > >
> > >
> > > On Mon, Nov 20, 2017 at 12:33 AM, Sönke Liebau <
> > > soenke.lie...@opencore.com.invalid> wrote:
> > >
> > > > @Randall: are you happy with the KIP as it stands so I can call for a
> > > vote,
> > > > or are there any outstanding items still to discuss?
> > > >
> > > > Same question to anybody else who'd like to participate of course :)
> > > >
> > > > On Thu, Nov 16, 2017 at 5:35 PM, Sönke Liebau <
> > > soenke.lie...@opencore.com>
> > > > wrote:
> > > >
> > > > > Sounds good. I've added a few sentences to this effect to the KIP.
> > > > >
> > > > > On Thu, Nov 16, 2017 at 5:02 PM, Randall Hauch 
> > > wrote:
> > > > >
> > > > >> Nice job updating the KIP. The PR (
> > > > >> https://github.com/apache/kafka/pull/2755/files) for the proposed
> > > > >> implementation does prevent names from being empty, and it trims
> > > > >> whitespace
> > > > >> from the name only when creating a new connector. However, the KIP's
> > > > >> "Proposed Change" section should probably be very clear about this,
> > > and
> > > > >> the
> > > > >> migration section should address how a connector that was created
> > with
> > > > >> leading and/or trailing whitespace characters will still be able to
> > be
> > > > >> updated and deleted. I think that decreases the 

Re: [DISCUSS] KIP-240: AdminClient.listReassignments AdminClient.describeReassignments

2018-01-09 Thread Colin McCabe
What if we had an internal topic which watchers could listen to for information 
about partition reassignments?  The information could be in JSON, so if we want 
to add new fields later, we always could.  

This avoids introducing a new AdminClient API.  For clients that want to be 
notified about partition reassignments in a timely fashion, this avoids the 
"polling an AdminClient API in a tight loop" antipattern.  It allows watchers 
to be notified in a simple and natural way about what is going on.  Access can 
be controlled by the existing topic ACL mechanisms.

best,
Colin


On Fri, Dec 22, 2017, at 06:48, Tom Bentley wrote:
> Hi Steven,
> 
> I must admit that I didn't really considered that option. I can see how
> attractive it is from your perspective. In practice it would come with lots
> of edge cases which would need to be thought through:
> 
> 1. What happens if the controller can't produce a record to this topic
> because the partitions leader is unavailable?
> 2. One solution to that is for the topic to be replicated on every broker,
> so that the controller could elect itself leader on controller failover.
> But that raises another problem: What if, upon controller failover, the
> controller is ineligible for leader election because it's not in the ISR?
> 3. The above questions suggest the controller might not always be able to
> produce to the topic, but the controller isn't able to control when other
> brokers catch up replicating moved partitions and has to deal with those
> events. The controller would have to record (in memory) that the
> reassignment was complete, but hadn't been published, and publish later,
> when it was able to.
> 4. Further to 3, we would need to recover the in-memory state of
> reassignments on controller failover. But now we have to consider what
> happens if the controller cannot *consume* from the topic.
> 
> This seems pretty complicated to me. I think each of the above points has
> alternatives (or compromises) which might make the problem more tractable,
> so I'd welcome hearing from anyone who has ideas on that. In particular
> there are parallels with consumer offsets which might be worth thinking
> about some more.
> 
> I would be useful it define better the use case we're trying to cater to
> here.
> 
> * Is it just a notification that a given reassignment has finished that
> you're interested in?
> * What are the consequences if such a notification is delayed, or dropped
> entirely?
> 
> Regards,
> 
> Tom
> 
> 
> 
> On 19 December 2017 at 20:34, Steven Aerts  wrote:
> 
> > Hello Tom,
> >
> >
> > when you were working out KIP-236, did you consider migrating the
> > reassignment
> > state from zookeeper to an internal kafka topic, keyed by partition
> > and log compacted?
> >
> > It would allow an admin client and controller to easily subscribe for
> > those changes,
> > without the need to extend the network protocol as discussed in KIP-240.
> >
> > This is just a theoretical idea I wanted to share, as I can't find a
> > reason why it would
> > be a stupid idea.
> > But I assume that in practice, this will imply too much change to the
> > code base to be
> > viable.
> >
> >
> > Regards,
> >
> >
> >Steven
> >
> >
> > 2017-12-18 11:49 GMT+01:00 Tom Bentley :
> > > Hi Steven,
> > >
> > > I think it would be useful to be able to subscribe yourself on updates of
> > >> reassignment changes.
> > >
> > >
> > > I agree this would be really useful, but, to the extent I understand the
> > > networking underpinnings of the admin client, it might be difficult to do
> > > well in practice. Part of the problem is that you might "set a watch" (to
> > > borrow the ZK terminology) via one broker (or the controller), only for
> > > that broker to fail (or the controller be re-elected). Obviously you can
> > > detect the loss of connection and set a new watch via a different broker
> > > (or the new controller), but that couldn't be transparent to the user,
> > > because the AdminClient doesn't know what changed while it was
> > > disconnected/not watching.
> > >
> > > Another issue is that to avoid races you really need to combine fetching
> > > the current state with setting the watch (as is done in the native
> > > ZooKeeper API). I think there are lots of subtle issues of this sort
> > which
> > > would need to be addressed to make something reliable.
> > >
> > > In the mean time, ZooKeeper already has a (proven and mature) API for
> > > watches, so there is, in principle, a good workaround. I say "in
> > principle"
> > > because in the KIP-236 proposal right now the /admin/reassign_partitions
> > > znode is legacy and the reassignment is represented by
> > > /admin/reassigments/$topic/$partition. That naming scheme for the znode
> > > would make it harder for ZooKeeper clients like yours because such
> > clients
> > > would need to set a child watch per topic. The original proposal for the
> > > naming scheme was 

Jenkins build is back to normal : kafka-1.0-jdk7 #121

2018-01-09 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-01-09 Thread Damian Guy
+1

On Wed, 3 Jan 2018 at 03:49 Matthias J. Sax  wrote:

> It must be two different names, as we add two processor to the topology:
> a source processor that only read the data from a topic, and the actual
> processor that maintains the global table.
>
>
> -Matthias
>
> On 1/2/18 7:14 PM, Ewen Cheslack-Postava wrote:
> > +1 binding, seems like a nice simplification.
> >
> > Regarding the source and processor name, do they actually need to be
> unique
> > or could they use the same value? Since these use incrementing integers,
> it
> > could be nice for debuggability/understanding to have them use the same
> > name if possible instead of generating 2 separate names.
> >
> > -Ewen
> >
> > On Tue, Jan 2, 2018 at 9:12 AM, Guozhang Wang 
> wrote:
> >
> >> On a side note, could you update the "compatibility and upgrade"
> section,
> >> that when users start to make code changes to leverage the new API, what
> >> kind of upgrade executions they'd need to do? I feel they need to rename
> >> topics / etc.
> >>
> >> On Tue, Jan 2, 2018 at 9:10 AM, Guozhang Wang 
> wrote:
> >>
> >>> +1, thanks!
> >>>
> >>> On Wed, Dec 27, 2017 at 6:01 PM, Ted Yu  wrote:
> >>>
>  +1
> 
>  On Wed, Dec 27, 2017 at 12:15 PM, Bill Bejeck 
> >> wrote:
> 
> > +1
> >
> > On Wed, Dec 27, 2017 at 3:07 PM, Matthias J. Sax <
> >> matth...@confluent.io
> >
> > wrote:
> >
> >> +1
> >>
> >> On 12/26/17 9:00 PM, Panuwat Anawatmongkhon wrote:
> >>> Hi all,
> >>> I would like to start the vote thread.
> >>> This is link for the kip.
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%
>  3A+Simplify+
> >> StreamsBuilder%23addGlobalStore
> >>>
> >>> Cheers
> >>>
> >>
> >>
> >
> 
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-09 Thread Rajini Sivaram
 Hi Ismael,

Thank you for reviewing the KIP.

*password.encoder.iterations: 2048*: That was a mistake in the doc, changed
to 4096, which is the minimum we use for SCRAM credentials

*password.encoder.key.length: 128: *That is a key size that works with the
default cipher algorithm. Will change if we change that.

I wasn't sure what to choose for these two, so chose common ones . Lastpass
docs say they use *PBKDF2WithHmacSHA256 with AES*

   - *password.encoder.keyfactory.algorithm: **PBKDF2WithHmacSHAn: *I think
   PBKDF2 is typically used as the SecretKeyFactory algorithm for password
   encryption. But not sure if we should choose something different,
   particularly if we want to support Java7 which doesn't support
   *PBKDF2WithHmacSHA512*.
   - password.encoder.cipher.algorithm: *AES/CBC/PKCS5Padding: *I haven't
   looked at AES/GCM variant, do you know if that is better?



On Tue, Jan 9, 2018 at 11:34 AM, Ismael Juma  wrote:

> Hi Rajini,
>
> Quick question (sorry if this was already discussed). How were the
> following chosen?
>
> Name: password.encoder.keyfactory.algorithm  Type: String Default:
> PBKDF2WithHmacSHA512 if available, otherwise PBKDF2WithHmacSHA1 (e.g.
> Java7)
> Name: password.encoder.cipher.algorithm  Type: String  Default:
> AES/CBC/PKCS5Padding
> Name: password.encoder.key.length Type: Integer  Default: 128
> Name: password.encoder.iterations  Type: Integer Default: 2048
>
> Also, was a AES/GCM variant considered as the default cipher algorithm?
>
> Ismael
>
> On Mon, Nov 20, 2017 at 1:57 PM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have submitted KIP-226 to enable dynamic reconfiguration of brokers
> > without restart:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 226+-+Dynamic+Broker+Configuration
> >
> > The KIP proposes to extend the current dynamic replication quota
> > configuration for brokers to support dynamic reconfiguration of a limited
> > set of configuration options that are typically updated during the
> lifetime
> > of a broker.
> >
> > Feedback and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2018-01-09 Thread Ismael Juma
Hi Rajini,

Quick question (sorry if this was already discussed). How were the
following chosen?

Name: password.encoder.keyfactory.algorithm  Type: String Default:
PBKDF2WithHmacSHA512 if available, otherwise PBKDF2WithHmacSHA1 (e.g. Java7)
Name: password.encoder.cipher.algorithm  Type: String  Default:
AES/CBC/PKCS5Padding
Name: password.encoder.key.length Type: Integer  Default: 128
Name: password.encoder.iterations  Type: Integer Default: 2048

Also, was a AES/GCM variant considered as the default cipher algorithm?

Ismael

On Mon, Nov 20, 2017 at 1:57 PM, Rajini Sivaram 
wrote:

> Hi all,
>
> I have submitted KIP-226 to enable dynamic reconfiguration of brokers
> without restart:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 226+-+Dynamic+Broker+Configuration
>
> The KIP proposes to extend the current dynamic replication quota
> configuration for brokers to support dynamic reconfiguration of a limited
> set of configuration options that are typically updated during the lifetime
> of a broker.
>
> Feedback and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-226 - Dynamic Broker Configuration

2018-01-09 Thread Rajini Sivaram
Thank you, Jun! I have updated the KIP.

If there are no other comments or concerns, I will close the vote later
today.

Thanks,

Rajini

On Mon, Jan 8, 2018 at 10:57 PM, Jun Rao  wrote:

> Hi, Rajini,
>
> Thanks for the explanation. Then your suggestion sounds good to me.
>
> Jun
>
> On Mon, Jan 8, 2018 at 1:32 PM, Rajini Sivaram 
> wrote:
>
> > Hi Jun,
> >
> > No,  password.encoder.secret cannot be updated dynamically at the moment.
> > Dynamic configs are stored in ZooKeeper and since ZK is not secure, all
> > password configs in ZK are encrypted using password.encoder.secret. We
> > cannot make password.encoder.secret dynamic since it would need another
> > secret to encrypt it for storing in ZK and that secret would need to be
> > static and cannot be rotated.
> >
> > On Mon, Jan 8, 2018 at 6:33 PM, Jun Rao  wrote:
> >
> > > Hi, Rajini,
> > >
> > > Could password.encoder.secret be updated dynamically? If so, each
> broker
> > > will still have access to the old secret when password.encoder.secret
> is
> > > updated. Perhaps that's a simpler way to handle changing secret than
> > > introducing an extra config.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jan 5, 2018 at 3:09 AM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > We are using 2-way encryption. The password configs encoded are
> > > > keystore/truststore passwords and JAAS configuration. We need to be
> > able
> > > to
> > > > extract the actual values for these, so we cannot use 1-way hash. So
> if
> > > we
> > > > have the old secret, we can decrypt and get the original values.
> > > >
> > > > Thank you,
> > > >
> > > > Rajini
> > > >
> > > > On Fri, Jan 5, 2018 at 12:11 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajin,
> > > > >
> > > > > Does providing the old-secret help? My understanding is that the
> > > encoded
> > > > > passwd is the result of a 1-way hash with the secret. So, one can't
> > > > decode
> > > > > the passwd with old-secret. If that's the case, one still needs to
> > > > provide
> > > > > the unencrypted paaswd to re-encode with the new secret?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jan 4, 2018 at 1:28 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun/Jason,
> > > > > >
> > > > > > I was wondering whether it is worth adding a new property (static
> > > > config
> > > > > in
> > > > > > server.properties) to pass in the previous encoder password as
> well
> > > > when
> > > > > > changing encoder password. So you would set:
> > > > > >
> > > > > >- password.encoder.secret=new-password
> > > > > >- password.encoder.old.secret=old-password
> > > > > >
> > > > > > When the broker starts up and loads passwords from ZK, it would
> > check
> > > > if
> > > > > > old-password is being used. If so, it would re-encode all
> passwords
> > > in
> > > > ZK
> > > > > > using new-password and store them back in ZK. If the new-password
> > is
> > > > > > already in use in ZK, the old one will be ignored. This needs an
> > > extra
> > > > > > property, but makes it simpler for the user since all other
> > passwords
> > > > can
> > > > > > be used from ZK.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 3, 2018 at 6:01 PM, Rajini Sivaram <
> > > > rajinisiva...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > Thank you for reviewing and voting.
> > > > > > >
> > > > > > > Thanks, I had missed the rename. Have updated the KIP.
> > > > > > >
> > > > > > > The configs can be defined in the static server.properties or
> in
> > > > > > > ZooKeeper. If a ZK config cannot be decoded (or is not valid),
> we
> > > log
> > > > > an
> > > > > > > error and revert to the static config or default. When updating
> > the
> > > > > > secret
> > > > > > > used by the encode, we expect all password values to be
> specified
> > > in
> > > > > > > server.properties. And the decoding or sanity check of the
> > password
> > > > in
> > > > > ZK
> > > > > > > would fail with the new secret, so we would use the password
> > values
> > > > > from
> > > > > > > server.properties. Once the broker starts up, the values can be
> > > reset
> > > > > in
> > > > > > ZK
> > > > > > > using AdminClient and they will be encoded using the new
> secret.
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 3, 2018 at 5:34 PM, Jason Gustafson <
> > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> +1 Thanks for the KIP. One minor nit: I think we changed
> > > > > > >> ConfigSource.TOPIC_CONFIG to ConfigSource.DYNAMIC_TOPIC_
> CONFIG
> > in
> > > > the
> > > > > > PR.
> > > > > > >>
> > > > > > >> As far as updating secrets, I wasn't sure I understand how
> that
> > > will
> > > > > > work.
> > > > > > >> Do the password 

[jira] [Resolved] (KAFKA-6171) [1.0.0] Logging is broken with Windows and Java 9

2018-01-09 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6171.
--
Resolution: Duplicate

> [1.0.0] Logging is broken with Windows and Java 9
> -
>
> Key: KAFKA-6171
> URL: https://issues.apache.org/jira/browse/KAFKA-6171
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
> Environment: Windows 10, Java 9.0.1 and 10 build 29
>Reporter: Juergen Zimmermann
>
> When I start a Kafka server on Windows using Java 9.0.1 (same for 10 build 
> 29), then I get the following error:
> {{[2017-11-04 10:05:11,457] WARN Error processing 
> kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=C:\Zimmermann\kafka\logs
>  (com.yammer.metrics.reporting.JmxReporter)
> javax.management.MalformedObjectNameException: Invalid character ':' in value 
> part of property
> at 
> java.management/javax.management.ObjectName.construct(ObjectName.java:621)
> at 
> java.management/javax.management.ObjectName.(ObjectName.java:1406)
> at 
> com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
> at 
> com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
> at 
> com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
> at 
> com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)
> at 
> kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:74)
> at kafka.log.LogManager.newGauge(LogManager.scala:50)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:117)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:116)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.log.LogManager.(LogManager.scala:116)
> at kafka.log.LogManager$.apply(LogManager.scala:799)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:92)
> at kafka.Kafka.main(Kafka.scala)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5850) Py4JJavaError: An error occurred while calling o40.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper

2018-01-09 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5850.
--
Resolution: Invalid

 Please reopen if you think the issue is related to kafka 


> Py4JJavaError: An error occurred while calling o40.loadClass. : 
> java.lang.ClassNotFoundException: 
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
> -
>
> Key: KAFKA-5850
> URL: https://issues.apache.org/jira/browse/KAFKA-5850
> Project: Kafka
>  Issue Type: Bug
>Reporter: Saurabh Bidwai
>
> ---
> Py4JJavaError Traceback (most recent call last)
>  in ()
> > 1 streamer(sc)
>  in streamer(sc)
>   5 pwords = 
> load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/positive.txt")
>   6 nwords = 
> load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/negative.txt")
> > 7 counts = stream(ssc, pwords, nwords, 600)
>   8 make_plot(counts)
>  in stream(ssc, pwords, nwords, duration)
>   1 def stream(ssc, pwords, nwords, duration):
> > 2 kstream = KafkaUtils.createDirectStream(ssc, topics = 
> ['twitterstream'], kafkaParams = {"metadata.broker.list": 
> ["dn1001:6667","dn2001:6667","dn3001:6667","dn4001:6667"]})
>   3 tweets = kstream.map(lambda x: x[1].encode("utf-8", "ignore"))
>   4 
>   5 # Each element of tweets will be the text of a tweet.
> /usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/kafka.py
>  in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, 
> valueDecoder, messageHandler)
> 150 if 'ClassNotFoundException' in str(e.java_exception):
> 151 KafkaUtils._printErrorMsg(ssc.sparkContext)
> --> 152 raise e
> 153 
> 154 stream = DStream(jstream, ssc, ser).map(func)
> Py4JJavaError: An error occurred while calling o40.loadClass.
> : java.lang.ClassNotFoundException: 
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>   at py4j.Gateway.invoke(Gateway.java:259)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:209)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3893) Kafka Broker ID disappears from /brokers/ids

2018-01-09 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3893.
--
Resolution: Fixed

 Please reopen if the issue still exists. 


> Kafka Broker ID disappears from /brokers/ids
> 
>
> Key: KAFKA-3893
> URL: https://issues.apache.org/jira/browse/KAFKA-3893
> Project: Kafka
>  Issue Type: Bug
>Reporter: chaitra
>Priority: Critical
>
> Kafka version used : 0.8.2.1 
> Zookeeper version: 3.4.6
> We have scenario where kafka 's broker in  zookeeper path /brokers/ids just 
> disappears.
> We see the zookeeper connection active and no network issue.
> The zookeeper conection timeout is set to 6000ms in server.properties
> Hence Kafka not participating in cluster



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6434) Kafka-consumer-groups.sh reset-offsets does not work properly for not existing group

2018-01-09 Thread George Smith (JIRA)
George Smith created KAFKA-6434:
---

 Summary: Kafka-consumer-groups.sh reset-offsets does not work 
properly for not existing group
 Key: KAFKA-6434
 URL: https://issues.apache.org/jira/browse/KAFKA-6434
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.11.0.2
Reporter: George Smith


Our usecase: We are migrating Spark streaming app into Kafka streaming. We want 
to continue processing from the last processed offsets of the Spark streaming 
app. Therefore we want to create define new consumer group with given offsets. 
The new app was not launched yet (we don't want to make side effects of 
processing into db) -> new consumer group does not exist.
I was happy to see the updated  Kafka-consumer-groups.sh supports reset-offsets 
method. Unfortunately it seems it's not working as expected. 

{code}
kafka-consumer-groups.bat --reset-offsets --bootstrap-server localhost:9092 
--topic testTopic:0 --group testGROUP --to-offset 10

Note: This will only show information about consumers that use the Java 
consumer API (non-ZooKeeper-based consumers).

TOPIC  PARTITION  NEW-OFFSET
testTopic  0  10
{code}

Now I want to check offsets for the group:
{code}
kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group 
testGROUP

Error: Consumer group 'testGROUP' does not exist.
{code}
That's strange, isn't it?

On the other side when I use kafka-streams-application-reset.sh - the group is 
obviously created - unfortunately this tool does not support given offsets for 
partitions (only the beginning is supported) + it does not support secured 
Kafka connection...




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)