Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-04-03 Thread Harsha
Hi Viktor,
  

"Now, will the consumer be able to consume a remote segment if:
- the remote segment is stored in the remote storage, BUT
- the leader broker failed right after this AND
- the follower which is to become a leader didn't scan yet for a new
segment?"

If I understand correctly, after a local log segment copied to remote and 
leader is failed to write the index files and leadership changed to a follower. 
In this case we consider the log segment copy failed and newly elected leader 
will start copying the data from last the known offset in the remote to copy.  
Consumers who are looking for the offset which might be in the failed copy log 
segment will continue to be read the data from local disk since the local log 
segment will only be deleted once a successful copy of the log segment.

"As a follow-up question, what are your experiences, does a failover in a
broker causes bigger than usual churn in the consumers? (I'm thinking about
the time required to rebuild remote index files.)"

Rebuild remote index files will only happen in case of  remote storage missing 
all the copied index files.  Fail-over will not trigger this rebuild.


Hi Ryan,

"Harsha, can you comment on this alternative approach: instead of fetching
directly from remote storage via a new API, implement something like
paging, where segments are paged-in and out of cold storage based on access
frequency/recency? For example, when a remote segment is accessed, it could
be first fetched to disk and then read from there. I suppose this would
require less code changes, or at least less API changes."

Copying whole log segment from remote is inefficient. When tiered storage is 
enabled users might prefer hardware with smaller disks and having to copy the 
log segment to local disk again , especially incase of multiple consumers on 
multiple topics triggering this might negatively affect the available local 
storage.
What we proposed in the KIP doesn't affect the existing APIs and we didn't call 
for any API changes. 

"And related to paging, does the proposal address what happens when a broker
runs out of HDD space? Maybe we should have a way to configure a max number
of segments or bytes stored on each broker, after which older or
least-recently-used segments are kicked out, even if they aren't expired
per the retention policy? Otherwise, I suppose tiered storage requires some
babysitting to ensure that brokers don't run out of local storage, despite
having access to potentially unbounded cold storage."

Existing Kafka behavior will not change with addition of tiered storage and 
enabling it also will not change behavior.
Just like today it's up to the operator to make sure the HD space is monitored 
and take necessary actions to mitigate that before it becomes fatal failure for 
broker. We don't stop users to configure the retention period to infinite and 
they can easily run out of the space.

These are not the alternatives considered as they are not efficient copy in out 
of local disk , hence the reason we didn't add to alternatives considered :).



Thanks,
Harsha






On Wed, Apr 3, 2019, at 7:51 AM, Ryanne Dolan wrote:
> Harsha, can you comment on this alternative approach: instead of fetching
> directly from remote storage via a new API, implement something like
> paging, where segments are paged-in and out of cold storage based on access
> frequency/recency? For example, when a remote segment is accessed, it could
> be first fetched to disk and then read from there. I suppose this would
> require less code changes, or at least less API changes.
> 
> And related to paging, does the proposal address what happens when a broker
> runs out of HDD space? Maybe we should have a way to configure a max number
> of segments or bytes stored on each broker, after which older or
> least-recently-used segments are kicked out, even if they aren't expired
> per the retention policy? Otherwise, I suppose tiered storage requires some
> babysitting to ensure that brokers don't run out of local storage, despite
> having access to potentially unbounded cold storage.
> 
> Just some things to add to Alternatives Considered :)
> 
> Ryanne
> 
> On Wed, Apr 3, 2019 at 8:21 AM Viktor Somogyi-Vass 
> wrote:
> 
> > Hi Harsha,
> >
> > Thanks for the answer, makes sense.
> > In the meantime one edge case popped up in my mind but first let me
> > summarize what I understand if I interpret your KIP correctly.
> >
> > So basically whenever the leader RSM copies over a segment to the remote
> > storage, the leader RLM will append an entry to its remote index files with
> > the remote position. After this LogManager can delete the local segment.
> > Parallel to this RLM followers are periodically scanning the remote storage
> > for files and if they find a new one they update their indices.
> >
> > Now, will the consumer be able to consume a remote segment if:
> > - the remote segment is stored in the remote storage, BUT
> > - the leader broker failed 

[jira] [Created] (KAFKA-8188) Zookeeper Connection Issue Take Down the Whole kafka cluster

2019-04-03 Thread Candice Wan (JIRA)
Candice Wan created KAFKA-8188:
--

 Summary: Zookeeper Connection Issue Take Down the Whole kafka 
cluster
 Key: KAFKA-8188
 URL: https://issues.apache.org/jira/browse/KAFKA-8188
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: Candice Wan
 Attachments: thread_dump.log

We recently upgraded to 2.1.1 and we saw below zookeeper connection issues 
which took down the whole cluster. We've got 3 nodes in the cluster, 2 of which 
had issues.

2019-04-03 08:25:19.603 
[main-SendThread(iaase3184.svr.emea.jpmchase.net:36100)] WARN 
org.apache.zookeeper.ClientCnxn - Unable to reconnect to ZooKeeper service, 
session 0x10071ff9baf0001 has expired
2019-04-03 08:25:19.603 
[main-SendThread(iaase3184.svr.emea.jpmchase.net:36100)] INFO 
org.apache.zookeeper.ClientCnxn - Unable to reconnect to ZooKeeper service, 
session 0x10071ff9baf0001 has expired, closing socket connection
2019-04-03 08:25:19.605 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn 
- EventThread shut down for session: 0x10071ff9baf0001
2019-04-03 08:25:19.605 [zk-session-expiry-handler0] INFO 
kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Session expired.
2019-04-03 08:25:19.609 [zk-session-expiry-handler0] INFO 
kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Initializing a new session 
to 
vsie5p0551.svr.emea.jpmchase.net:36100,iaase3184.svr.emea.jpmchase.net:36100,iaase3360.svr.emea.jpmchase.net:36100.
2019-04-03 08:25:19.610 [zk-session-expiry-handler0] INFO 
org.apache.zookeeper.ZooKeeper - Initiating client connection, 
connectString=vsie5p0551.svr.emea.jpmchase.net:36100,iaase3184.svr.emea.jpmchase.net:36100,iaase3360.svr.emea.jpmchase.net:36100
 sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@12f8b1d8
2019-04-03 08:25:19.610 [zk-session-expiry-handler0] INFO 
o.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 4194304 Bytes
2019-04-03 08:25:19.611 
[zk-session-expiry-handler0-SendThread(vsie5p0551.svr.emea.jpmchase.net:36100)] 
WARN org.apache.zookeeper.ClientCnxn - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'file:/app0/common/config/ldap-auth.config'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it.
2019-04-03 08:25:19.611 
[zk-session-expiry-handler0-SendThread(vsie5p0551.svr.emea.jpmchase.net:36100)] 
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 
vsie5p0551.svr.emea.jpmchase.net/169.30.47.206:36100
2019-04-03 08:25:19.611 [zk-session-expiry-handler0-EventThread] ERROR 
kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Auth failed.
2019-04-03 08:25:19.611 
[zk-session-expiry-handler0-SendThread(vsie5p0551.svr.emea.jpmchase.net:36100)] 
INFO org.apache.zookeeper.ClientCnxn - Socket connection established, 
initiating session, client: /169.20.222.18:56876, server: 
vsie5p0551.svr.emea.jpmchase.net/169.30.47.206:36100
2019-04-03 08:25:19.612 [controller-event-thread] INFO 
k.controller.PartitionStateMachine - [PartitionStateMachine controllerId=3] 
Stopped partition state machine
2019-04-03 08:25:19.613 [controller-event-thread] INFO 
kafka.controller.ReplicaStateMachine - [ReplicaStateMachine controllerId=3] 
Stopped replica state machine
2019-04-03 08:25:19.614 [controller-event-thread] INFO 
kafka.controller.KafkaController - [Controller id=3] Resigned
2019-04-03 08:25:19.615 [controller-event-thread] INFO kafka.zk.KafkaZkClient - 
Creating /brokers/ids/3 (is it secure? false)
2019-04-03 08:25:19.628 
[zk-session-expiry-handler0-SendThread(vsie5p0551.svr.emea.jpmchase.net:36100)] 
INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 
vsie5p0551.svr.emea.jpmchase.net/169.30.47.206:36100, sessionid = 
0x1007f4d2b81, negotiated timeout = 6000
2019-04-03 08:25:19.631 [/config/changes-event-process-thread] INFO 
k.c.ZkNodeChangeNotificationListener - Processing notification(s) to 
/config/changes
2019-04-03 08:25:19.637 [controller-event-thread] ERROR 
k.zk.KafkaZkClient$CheckedEphemeral - Error while creating ephemeral at 
/brokers/ids/3, node already exists and owner '72182936680464385' does not 
match current session '72197563457011712'
2019-04-03 08:25:19.637 [controller-event-thread] INFO kafka.zk.KafkaZkClient - 
Result of znode creation at /brokers/ids/3 is: NODEEXISTS
2019-04-03 08:25:19.644 [controller-event-thread] ERROR 
k.c.ControllerEventManager$ControllerEventThread - [ControllerEventThread 
controllerId=3] Error processing event RegisterBrokerAndReelect
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
 at 

Re: [VOTE] KIP-422: Use the default value of max.poll.interval in Streams

2019-04-03 Thread John Roesler
Thanks all. The KIP-442 vote has passed with 3 binding votes (Guozhang,
Bill, and Damian) and one non-binding vote (me) in favor and none against.

I'll update the KIP page.

-John

On Fri, Mar 29, 2019 at 10:29 AM Damian Guy  wrote:

> +1
>
> On Wed, 27 Mar 2019 at 21:38, John Roesler  wrote:
>
> > Ah, good point, Guozhang. I'll remove that mention from the KIP.
> >
> > On Wed, Mar 27, 2019 at 3:30 PM Bill Bejeck  wrote:
> >
> > > +1 for me,
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Mar 27, 2019 at 4:13 PM Guozhang Wang 
> > wrote:
> > >
> > > > +1 from me.
> > > >
> > > > Though note that we cannot make such changes in older versions since
> > even
> > > > if we release new versions out of those branches they are considered
> > > > bug-fix only and hence should not have any interface impacting
> changes.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Mar 27, 2019 at 12:55 PM John Roesler 
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Since the KIP is so small, I'm going to optimistically start the
> vote
> > > for
> > > > > KIP-422 to remove our "max int" default max.poll.interval.ms in
> > > Streams
> > > > > and
> > > > > fall back to the Consumer default of five minutes.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams
> > > > >
> > > > > Permalink: https://cwiki.apache.org/confluence/x/1COGBg
> > > > >
> > > > > See also: https://issues.apache.org/jira/browse/KAFKA-6399
> > > > >
> > > > > Please let me know if you have any objections and wish to return to
> > the
> > > > > discussion phase!
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Build failed in Jenkins: kafka-2.2-jdk8 #75

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[rhauch] KAFKA-8126: Flaky Test

--
[...truncated 2.73 MB...]

kafka.coordinator.group.GroupCoordinatorTest > testValidJoinGroup STARTED

kafka.coordinator.group.GroupCoordinatorTest > testValidJoinGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldDelayRebalanceUptoRebalanceTimeout STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldDelayRebalanceUptoRebalanceTimeout PASSED

kafka.coordinator.group.GroupCoordinatorTest > testFetchOffsets STARTED

kafka.coordinator.group.GroupCoordinatorTest > testFetchOffsets PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testSessionTimeoutDuringRebalance STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testSessionTimeoutDuringRebalance PASSED

kafka.coordinator.group.GroupCoordinatorTest > testNewMemberJoinExpiration 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testNewMemberJoinExpiration 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testFetchTxnOffsetsWithAbort 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testFetchTxnOffsetsWithAbort 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupLeaderAfterFollower 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupLeaderAfterFollower 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupFromUnknownMember 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupFromUnknownMember 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testValidLeaveGroup STARTED

kafka.coordinator.group.GroupCoordinatorTest > testValidLeaveGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > testDescribeGroupInactiveGroup 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testDescribeGroupInactiveGroup 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testFetchTxnOffsetsIgnoreSpuriousCommit STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testFetchTxnOffsetsIgnoreSpuriousCommit PASSED

kafka.coordinator.group.GroupCoordinatorTest > testPendingMembersLeavesGroup 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testPendingMembersLeavesGroup 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupNotCoordinator 
STARTED

kafka.coordinator.group.GroupCoordinatorTest > testSyncGroupNotCoordinator 
PASSED

kafka.coordinator.group.GroupCoordinatorTest > testBasicFetchTxnOffsets STARTED

kafka.coordinator.group.GroupCoordinatorTest > testBasicFetchTxnOffsets PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testHeartbeatUnknownConsumerExistingGroup STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testHeartbeatUnknownConsumerExistingGroup PASSED

kafka.coordinator.group.GroupCoordinatorTest > testValidHeartbeat STARTED

kafka.coordinator.group.GroupCoordinatorTest > testValidHeartbeat PASSED

kafka.coordinator.group.GroupCoordinatorTest > 
testRequestHandlingWhileLoadingInProgress STARTED

kafka.coordinator.group.GroupCoordinatorTest > 
testRequestHandlingWhileLoadingInProgress PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > 

Build failed in Jenkins: kafka-2.0-jdk8 #242

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[rhauch] KAFKA-8126: Flaky Test

--
[...truncated 899.79 KB...]
kafka.server.ServerGenerateBrokerIdTest > 
testConsistentBrokerIdFromUserConfigAndMetaProps PASSED

kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup STARTED

kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup PASSED

kafka.server.ServerShutdownTest > testConsecutiveShutdown STARTED

kafka.server.ServerShutdownTest > testConsecutiveShutdown PASSED

kafka.server.ServerShutdownTest > testControllerShutdownDuringSend STARTED

kafka.server.ServerShutdownTest > testControllerShutdownDuringSend PASSED

kafka.server.ServerShutdownTest > 
testCleanShutdownAfterFailedStartupDueToCorruptLogs STARTED

kafka.server.ServerShutdownTest > 
testCleanShutdownAfterFailedStartupDueToCorruptLogs PASSED

kafka.server.ServerShutdownTest > testCleanShutdown STARTED

kafka.server.ServerShutdownTest > testCleanShutdown PASSED

kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled 
STARTED

kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled PASSED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithDownConversionDisabled STARTED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithDownConversionDisabled PASSED

unit.kafka.server.FetchRequestDownConversionConfigTest > testV1FetchFromReplica 
STARTED

unit.kafka.server.FetchRequestDownConversionConfigTest > testV1FetchFromReplica 
PASSED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testLatestFetchWithDownConversionDisabled STARTED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testLatestFetchWithDownConversionDisabled PASSED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithTopicLevelOverrides STARTED

unit.kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithTopicLevelOverrides PASSED

> Task :kafka-2.0-jdk8:core:copyDependantLibs
> Task :kafka-2.0-jdk8:core:jar
> Task :kafka-2.0-jdk8:connect:api:compileJava UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:api:processResources NO-SOURCE
> Task :kafka-2.0-jdk8:connect:api:classes UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:api:copyDependantLibs UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:api:jar UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:json:compileJava UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:json:processResources NO-SOURCE
> Task :kafka-2.0-jdk8:connect:json:classes UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:json:copyDependantLibs UP-TO-DATE
> Task :kafka-2.0-jdk8:connect:json:jar UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:compileJava UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:processResources NO-SOURCE
> Task :kafka-2.0-jdk8:streams:classes UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:copyDependantLibs
> Task :kafka-2.0-jdk8:streams:jar UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:test-utils:compileJava UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:test-utils:processResources NO-SOURCE
> Task :kafka-2.0-jdk8:streams:test-utils:classes UP-TO-DATE
> Task :kafka-2.0-jdk8:streams:test-utils:copyDependantLibs
> Task :kafka-2.0-jdk8:streams:test-utils:jar UP-TO-DATE

> Task :kafka-2.0-jdk8:streams:compileTestJava
:169:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;
builder.addProcessor("processor", new MockProcessorSupplier(), null);
   ^
  cast to String for a varargs call
  cast to String[] for a non-varargs call and to suppress this warning
:200:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;
builder.addSink("sink", "topic", null, null, null, null);
   ^
  cast to String for a varargs call
  cast to String[] for a non-varargs call and to suppress this warning
:190:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;
topology.addProcessor("processor", new MockProcessorSupplier(), 
null);
^
  cast to String for a varargs call
  cast to String[] for a non-varargs call and to suppress this warning
:230:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;

Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-04-03 Thread pdavidson
Thanks Randall, I updated the proposal as suggested. Let me know if any
other changes need to be made, otherwise I think the KIP-411 proposal is
ready to finalize.  I will aim to call a vote on Friday.

On Mon, Mar 25, 2019 at 7:12 AM Ryanne Dolan  wrote:

> Randall, Paul, the proposal looks great, thanks.
>
> Ryanne
>
> On Mon, Mar 25, 2019, 9:03 AM Randall Hauch  wrote:
>
> > Paul,
> >
> > Thanks for updating the KIP with the proposal. I do think the KIP should
> at
> > least mention that the prior behavior is to allow the worker to override
> > the `producer.client.id` or `consumer.client.id`, which is entirely
> > possible (though unlikely since there would be an MBean conflict, as
> > pointed out in the discussion). It might be sufficient to just add a
> > sentence to the "Compatibility, Deprecation, and Migration Plan" section,
> > like "Any client IDs specified in the worker configuration via `
> > producer.client.id` or `consumer.client.id` properties will be
> unchanged,
> > as those will take precedence." Thoughts?
> >
> > Ryanne,
> >
> > IIUC your last message, I think the latest KIP proposal will align pretty
> > closely with your suggestion. Can you review and confirm?
> >
> > Best regards,
> >
> > Randall
> >
> > On Fri, Mar 1, 2019 at 3:04 PM Ryanne Dolan 
> wrote:
> >
> > > Paul, Randall, I don't think most people will care to exercise so much
> > > control over the client IDs, so long as they are filled in
> automatically
> > in
> > > a way that eliminates duplicate metrics and remains somewhat legible.
> If
> > we
> > > let the user specify a pattern or something, we're really just making
> the
> > > user worry about these requirements.
> > >
> > > For example, if they specify "foo" as the client.id, they'll get a
> bunch
> > > of
> > > exceptions about that MBean already existing. So they'll try
> > > "${connectorName}-foo", which won't work because connectors that get
> > > restarted will re-use the same client ID and the same MBean again. And
> so
> > > on, until they end up solving the same problem we are trying to solve
> > here.
> > >
> > > I think you at least need something like
> "connect--producer-dlq"
> > to
> > > avoid MBeans being re-registered within the same JVM. I believe the
> task
> > ID
> > > is based on the connector name, so you'd get e.g.
> > > "connect-myconnector-1-producer".
> > >
> > > Ryanne
> > >
> > >
> > > On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
> > >  wrote:
> > >
> > > > Thanks Randall.  I like your suggestion: as you say, this would make
> it
> > > > possible to usefully override the default client id properties.
> > > >
> > > > I'm not sure how we would handle the dead-letter queue case though -
> > > maybe
> > > > we could automatically add a "dlq-" prefix to the producer client id?
> > > >
> > > > If there is agreement on this change I will update the KIP and the PR
> > > (when
> > > > I find some time).
> > > >
> > > >
> > > > On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch 
> > wrote:
> > > >
> > > > > Hi, Paul. Thanks for the update to KIP-411 to reflect adding
> > defaults,
> > > > and
> > > > > creating/updating https://github.com/apache/kafka/pull/6097 to
> > reflect
> > > > > this
> > > > > approach.
> > > > >
> > > > > Now that we've avoided adding a new config and have changed the
> > > default `
> > > > > client.id` to include some context, the connector name, and task
> > > > number, I
> > > > > think it makes overriding the client ID via worker config `
> > > > > producer.client.id` or `consumer.client.id` properties less
> valuable
> > > > > because those overridden client IDs will be exactly the same for
> all
> > > > > connectors and tasks.
> > > > >
> > > > > One one hand, we can leave this as-is, and any users that include `
> > > > > producer.client.id` and `consumer.client.id` in their worker
> configs
> > > > keep
> > > > > the same (sort of useless) behavior. In fact, most users would
> > probably
> > > > be
> > > > > better off by removing these worker config properties and instead
> > > relying
> > > > > upon the defaults.
> > > > >
> > > > > On the other, similar to what Ewen suggested earlier (in a
> different
> > > > > context), we could add support for users to optionally use
> > > > > "${connectorName}" and ${task}" in their overridden client ID
> > property
> > > > and
> > > > > have Connect replace these (if found) with the connector name and
> > task
> > > > > number. Any existing properties that don't use these variables
> would
> > > > behave
> > > > > as-is, but this way the users could define their own client IDs yet
> > > still
> > > > > get the benefit of uniquely identifying each of the clients. For
> > > example,
> > > > > if my worker config contained the following:
> > > > >
> > > > > producer.client.id
> > > > =connect-cluster-A-${connectorName}-${task}-producer
> > > > > consumer.client.id
> > > > =connect-cluster-A-${connectorName}-${task}-consumer
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > 

Build failed in Jenkins: kafka-2.1-jdk8 #156

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[rhauch] KAFKA-8126: Flaky Test

--
[...truncated 1.90 MB...]
   ^
:236:
 object ZkUtils in package utils is deprecated: This is an internal class that 
is no longer used by Kafka and will be removed in a future release. Please use 
org.apache.kafka.clients.admin.AdminClient instead.
val sensitive = ZkUtils.sensitivePath(path)
^
:265:
 object ZkUtils in package utils is deprecated: This is an internal class that 
is no longer used by Kafka and will be removed in a future release. Please use 
org.apache.kafka.clients.admin.AdminClient instead.
val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
  ^
:283:
 class ZkUtils in package utils is deprecated: This is an internal class that 
is no longer used by Kafka and will be removed in a future release. Please use 
org.apache.kafka.clients.admin.AdminClient instead.
  private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
   ^
:123:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  val partitionOffsetCommitData = new 
OffsetCommitRequest.PartitionData(15L, 23, "")
  ^
:143:
 method poll in class KafkaConsumer is deprecated: see corresponding Javadoc 
for more information.
  consumer.poll(0).count() == 1
   ^
:192:
 method poll in class KafkaConsumer is deprecated: see corresponding Javadoc 
for more information.
  consumer.poll(0).count() > 0
   ^
:207:
 method poll in class KafkaConsumer is deprecated: see corresponding Javadoc 
for more information.
  consumer.poll(0)
   ^
:58:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 
10)).asJava).build(0)
^
:90:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 
15)).asJava).build()
^
:91:
 value offsets in class PartitionData is deprecated: see corresponding Javadoc 
for more information.
val consumerOffsets = 
sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala

   ^
:118:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 
15)).asJava).build()
^
:119:
 value offsets in class PartitionData is deprecated: see corresponding Javadoc 
for more information.
val consumerOffsets = 
sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala

   ^
:146:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  new 
ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 
1)).asJava).build()
  ^
:147:
 value offsets in class 

Build failed in Jenkins: kafka-trunk-jdk11 #418

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[rhauch] KAFKA-8126: Flaky Test

--
[...truncated 2.36 MB...]
org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testUnknownConnectorPaused PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorPausedRunningTaskOnly STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorPausedRunningTaskOnly PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorResumedRunningTaskOnly STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorResumedRunningTaskOnly PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testTaskConfigAdded STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testTaskConfigAdded PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinLeaderCatchUpFails STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinLeaderCatchUpFails PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testInconsistentConfigs STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testInconsistentConfigs PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToLeader STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToLeader PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToOwner STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToOwner PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownTask STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownTask PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRequestProcessingOrder STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRequestProcessingOrder PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTaskRedirectToLeader STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTaskRedirectToLeader PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 

[jira] [Created] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-04-03 Thread William Greer (JIRA)
William Greer created KAFKA-8187:


 Summary: State store record loss across multiple reassignments 
when using standby tasks
 Key: KAFKA-8187
 URL: https://issues.apache.org/jira/browse/KAFKA-8187
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1
Reporter: William Greer


Overview:
There is a race condition that can cause a partitioned state store to be 
missing records up to an offset when using standby tasks.

When a reassignment occurs and a task is migrated to a StandbyTask in another 
StreamThread/TaskManager on the same JVM, there can be lock contention that 
prevents the StandbyTask on the currently assigned StreamThread from acquiring 
the lock and to not retry acquiring the lock because all of the active 
StreamTasks are running for that StreamThread. If the StandbyTask does not 
acquire the lock before the StreamThread enters into the RUNNING state, then 
the StandbyTask will not consume any records. If there is no subsequent 
reassignment before the second execution of the stateDirCleaner Thread, then 
the task directory for the StandbyTask will be deleted. When the next 
reassignment occurs the offset that was read by the StandbyTask at creation 
time before acquiring the lock will be written back to the state store 
directory, this re-creates the state store directory.


An example:
StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
streams application.

StreamThread(A) has StandbyTask 1_0
StreamThread(B) has no tasks

A reassignment is triggered by another host in the streams application fleet.

StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads one 
task
StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby task 
for 1_0

Here begins the race condition.
StreamThread(B) creates the StandbyTask which reads the current checkpoint from 
disk.
StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's assigned 
tasks. [0]
StreamThread(B) initializes the new tasks for the active and standby tasks. [1] 
[2]
StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
with a LockException [3], since StreamThread(A) still holds the lock.
StreamThread(B) returns true from updateNewAndRestoringTasks() due to the check 
at [4] which only checks that the active assigned tasks are running.
StreamThread(B) state is set to RUNNING
StreamThread(A) closes the previous StandbyTask specifically calling 
closeStateManager() [5]
StreamThread(A) state is set to RUNNING

Streams application for this host has completed re-balancing and is now in the 
RUNNING state.

State at this point is the following: State directory exists for 1_0 and all 
data is present.

Then at a period that is 1 to 2 intervals of [6](which is default of 10 
minutes) after the reassignment had completed the stateDirCleaner thread will 
execute [7].

The stateDirCleaner will then do [8], which finds the directory 1_0, finds that 
there isn't an active lock for that directory, acquire the lock, and deletes 
the directory.

State at this point is the following: State directory does not exist for 1_0.

When the next reassignment occurs. The offset that was read by StreamThread(B) 
during construction of the StandbyTask for 1_0 will be written back to disk. 
This write re-creates the state store directory and writes the .checkpoint file 
with the old offset.

State at this point is the following: State directory exists for 1_0 with a 
'.checkpoint' file in it, but there is no other state store data in the 
directory.

If this host is assigned the active task for 1_0 then all the history in the 
state store will be missing from before the offset that was read at the 
previous reassignment. 
If this host is assigned the standby task for 1_0 then the lock will be 
acquired and the standby will start to consume records, but it will still be 
missing all records from before the offset that was read at the previous 
reassignment.
If this host is not assigned 1_0, then the state directory will get cleaned up 
by the stateDirCleaner thread 10 to 20 minutes later and the record loss issue 
will be hidden.

[0] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
[1] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L324-L340
[2] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L65-L84
[3] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L212-L236
[4] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L332
[5] 

Build failed in Jenkins: kafka-trunk-jdk8 #3510

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[rhauch] KAFKA-8126: Flaky Test

--
[...truncated 2.35 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 

Kafka Jenkins not showing recent builds in history

2019-04-03 Thread Sönke Liebau
Hi everybody,

I looked through recent Jenkins builds for a while today and it somehow
looks off to me.

Both jobs [1] [2] don't show any builds that are more recent than March
19th in the "build history".
Only the "last failed" and "last unsuccessful" permalinks show recent
dates. Build pages can be accessed by changing the build id in the link
though.

That seems weird to me, I would have expected builds to how up in the
history, no matter if they were successful or not.

Can someone shed some light on this for me? I am probably missing something
obvious.

Best regards,
Sönke



[1] https://builds.apache.org/job/kafka-pr-jdk11-scala2.12
[2] https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/


[jira] [Resolved] (KAFKA-5141) WorkerTest.testCleanupTasksOnStop transient failure due to NPE

2019-04-03 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-5141.
--
Resolution: Fixed

> WorkerTest.testCleanupTasksOnStop transient failure due to NPE
> --
>
> Key: KAFKA-5141
> URL: https://issues.apache.org/jira/browse/KAFKA-5141
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.3.0
>Reporter: Ewen Cheslack-Postava
>Priority: Critical
>  Labels: flaky-test, transient-unit-test-failure
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3281/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testCleanupTasksOnStop/
> Looks like the potential culprit is a NullPointerException when trying to 
> start a connector. It's likely being caught and logged via a catch 
> (Throwable). From the lines being executed it looks like the null might be 
> due to the instantiation of the Connector returning null, although I don't 
> see how that is possible given the current code. We may need more logging 
> output to track the issue down.
> {quote}
> Error Message
> java.lang.AssertionError: 
>   Expectation failure on verify:
> WorkerSourceTask.run(): expected: 1, actual: 0
> Stacktrace
> java.lang.AssertionError: 
>   Expectation failure on verify:
> WorkerSourceTask.run(): expected: 1, actual: 0
>   at org.easymock.internal.MocksControl.verify(MocksControl.java:225)
>   at 
> org.powermock.api.easymock.internal.invocationcontrol.EasyMockMethodInvocationControl.verify(EasyMockMethodInvocationControl.java:132)
>   at org.powermock.api.easymock.PowerMock.verify(PowerMock.java:1466)
>   at org.powermock.api.easymock.PowerMock.verifyAll(PowerMock.java:1405)
>   at 
> org.apache.kafka.connect.runtime.WorkerTest.testCleanupTasksOnStop(WorkerTest.java:480)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:310)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
>   at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
>   at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
>   at 
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
>   at 
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
>   at 
> org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
>   at 
> org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
>   at 
> org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
>   at 
> 

[jira] [Resolved] (KAFKA-8126) Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask

2019-04-03 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8126.
--
Resolution: Fixed

> Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask
> 
>
> Key: KAFKA-8126
> URL: https://issues.apache.org/jira/browse/KAFKA-8126
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, unit tests
>Reporter: Guozhang Wang
>Priority: Major
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> {code}
> Stacktrace
> java.lang.AssertionError: 
>   Expectation failure on verify:
> WorkerSourceTask.run(): expected: 1, actual: 0
>   at org.easymock.internal.MocksControl.verify(MocksControl.java:242)
>   at 
> org.powermock.api.easymock.internal.invocationcontrol.EasyMockMethodInvocationControl.verify(EasyMockMethodInvocationControl.java:126)
>   at org.powermock.api.easymock.PowerMock.verify(PowerMock.java:1476)
>   at org.powermock.api.easymock.PowerMock.verifyAll(PowerMock.java:1415)
>   at 
> org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask(WorkerTest.java:589)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3330/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testAddRemoveTask/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8063) Flaky Test WorkerTest#testConverterOverrides

2019-04-03 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8063.
--
Resolution: Fixed

> Flaky Test WorkerTest#testConverterOverrides
> 
>
> Key: KAFKA-8063
> URL: https://issues.apache.org/jira/browse/KAFKA-8063
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20068/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testConverterOverrides/]
> {quote}java.lang.AssertionError: Expectation failure on verify: 
> WorkerSourceTask.run(): expected: 1, actual: 1 at 
> org.easymock.internal.MocksControl.verify(MocksControl.java:242){quote}
> STDOUT
> {quote}[2019-03-07 02:28:25,482] (Test worker) ERROR Failed to start 
> connector test-connector (org.apache.kafka.connect.runtime.Worker:234) 
> org.apache.kafka.connect.errors.ConnectException: Failed to find Connector at 
> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
>  at 
> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
>  at 
> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97)
>  at 
> org.apache.kafka.connect.runtime.isolation.Plugins$$EnhancerByCGLIB$$205db954.newConnector()
>  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:226) 
> at 
> org.apache.kafka.connect.runtime.WorkerTest.testStartConnectorFailure(WorkerTest.java:256)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68) at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>  at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89) at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:310)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:131)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.access$100(PowerMockJUnit47RunnerDelegateImpl.java:59)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner$TestExecutorStatement.evaluate(PowerMockJUnit47RunnerDelegateImpl.java:147)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.evaluateStatement(PowerMockJUnit47RunnerDelegateImpl.java:107)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:298)
>  at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87) at 
> org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50) at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:218)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:160)
>  at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:134)
>  at 
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34) at 
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44) at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:136)
>  at 
> org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:117)
>  at 
> org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:57)
>  at 

Jenkins build is back to normal : kafka-trunk-jdk8 #3509

2019-04-03 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk11 #417

2019-04-03 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-443: Return to default segment.ms and segment.index.bytes in Streams repartition topics

2019-04-03 Thread Guozhang Wang
Hello Sönke,

Thanks for the quick catch! I've fixed the test with this change.

Guozhang

On Wed, Apr 3, 2019 at 5:53 AM Sönke Liebau
 wrote:

> Hi Guozhang,
>
> I've left a comment on the merged pull request, but not sure if you'll get
> notified about that since the PR was already merged, so I'll write here as
> well.
> Setting this to -1 needs to be reflected in the test
> shouldAddInternalTopicConfigForRepartitionTopics
> as well, as this currently checks for Long.MAX_VALUE [1] and consistently
> fails.
>
> Best regards,
> Sönke
>
> [1]
>
> https://github.com/apache/kafka/blob/213466b3d4fd21b332c0b6882fea36cf1affef1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java#L647
>
>
> On Wed, Apr 3, 2019 at 2:07 AM Guozhang Wang  wrote:
>
> > Hello folks,
> >
> > I'm closing this voting thread now, thanks to all who have provided your
> > feedbacks!
> >
> > Here's a quick tally:
> >
> > Binding +1: 4 (Damian, Bill, Manikumar, Guozhang)
> > Non-binding +1: (John, Mickael).
> >
> >
> > Guozhang
> >
> > On Fri, Mar 29, 2019 at 11:32 AM Guozhang Wang 
> wrote:
> >
> > > Ah I see, my bad :) Yes that was the documented value in `TopicConfig`,
> > > and I agree we should just change that as well.
> > >
> > > Will update the KIP.
> > >
> > >
> > >
> > > On Fri, Mar 29, 2019 at 11:27 AM Mickael Maison <
> > mickael.mai...@gmail.com>
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> I know the KIP is about segments configuration but I'm talking about
> > >> retention.ms which is also explicitly set on repartition topics
> > >>
> > >>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java#L39
> > >> Streams is setting it to Long.MAX_VALUE, but -1 is the "documented"
> > >> way to disable the time limit. That's why I said "for consistency" as
> > >> in practice it's not going to change anything.
> > >>
> > >> On Fri, Mar 29, 2019 at 5:09 PM Guozhang Wang 
> > wrote:
> > >> >
> > >> > Hello Mickael,
> > >> >
> > >> > segment.ms default value in TopicConfig is 7 days, I think this is
> a
> > >> > sufficient default value. Do you have any motivations to set it to
> -1?
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> > On Fri, Mar 29, 2019 at 9:42 AM Mickael Maison <
> > >> mickael.mai...@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > +1 (non binding)
> > >> > > For consistency, should we also set retention.ms to -1 instead of
> > >> > > Long.MAX_VALUE?
> > >> > >
> > >> > > On Fri, Mar 29, 2019 at 3:59 PM Manikumar <
> > manikumar.re...@gmail.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > +1 (binding)
> > >> > > >
> > >> > > > Thanks for the KIP.
> > >> > > >
> > >> > > > On Fri, Mar 29, 2019 at 9:04 PM Damian Guy <
> damian@gmail.com>
> > >> wrote:
> > >> > > >
> > >> > > > > +1
> > >> > > > >
> > >> > > > > On Fri, 29 Mar 2019 at 01:59, John Roesler  >
> > >> wrote:
> > >> > > > >
> > >> > > > > > +1 (nonbinding) from me.
> > >> > > > > >
> > >> > > > > > On Thu, Mar 28, 2019 at 7:08 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hello folks,
> > >> > > > > > >
> > >> > > > > > > I'd like to directly start a voting thread on this simple
> > KIP
> > >> to
> > >> > > change
> > >> > > > > > the
> > >> > > > > > > default override values for repartition topics:
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics
> > >> > > > > > >
> > >> > > > > > > The related PR can be found here as well:
> > >> > > > > > > https://github.com/apache/kafka/pull/6511
> > >> > > > > > >
> > >> > > > > > > If you have any thoughts or feedbacks, they are more than
> > >> welcomed
> > >> > > as
> > >> > > > > > well.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > -- Guozhang
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-04-03 Thread Ryanne Dolan
Harsha, can you comment on this alternative approach: instead of fetching
directly from remote storage via a new API, implement something like
paging, where segments are paged-in and out of cold storage based on access
frequency/recency? For example, when a remote segment is accessed, it could
be first fetched to disk and then read from there. I suppose this would
require less code changes, or at least less API changes.

And related to paging, does the proposal address what happens when a broker
runs out of HDD space? Maybe we should have a way to configure a max number
of segments or bytes stored on each broker, after which older or
least-recently-used segments are kicked out, even if they aren't expired
per the retention policy? Otherwise, I suppose tiered storage requires some
babysitting to ensure that brokers don't run out of local storage, despite
having access to potentially unbounded cold storage.

Just some things to add to Alternatives Considered :)

Ryanne

On Wed, Apr 3, 2019 at 8:21 AM Viktor Somogyi-Vass 
wrote:

> Hi Harsha,
>
> Thanks for the answer, makes sense.
> In the meantime one edge case popped up in my mind but first let me
> summarize what I understand if I interpret your KIP correctly.
>
> So basically whenever the leader RSM copies over a segment to the remote
> storage, the leader RLM will append an entry to its remote index files with
> the remote position. After this LogManager can delete the local segment.
> Parallel to this RLM followers are periodically scanning the remote storage
> for files and if they find a new one they update their indices.
>
> Now, will the consumer be able to consume a remote segment if:
> - the remote segment is stored in the remote storage, BUT
> - the leader broker failed right after this AND
> - the follower which is to become a leader didn't scan yet for a new
> segment?
> Would this result in an OffsetOutOfRangeException or would the failover
> halt the consume request until the new leader has the latest information?
> As a follow-up question, what are your experiences, does a failover in a
> broker causes bigger than usual churn in the consumers? (I'm thinking about
> the time required to rebuild remote index files.)
>
> Thanks,
> Viktor
>
> On Mon, Apr 1, 2019 at 8:49 PM Harsha  wrote:
>
> > Hi Eno,
> >
> >   Thanks for the comments. Answers are inline
> >
> > "Performance & durability
> > --
> > - would be good to have more discussion on performance implications of
> > tiering. Copying the data from the local storage to the remote storage is
> > going to be expensive in terms of network bandwidth and will affect
> > foreground traffic to Kafka potentially reducing its throughput and
> > latency."
> >
> > Good point. We've run our local tests with 10GigE cards, even though our
> > clients bandwidth requirements are high with 1000s of clients producing /
> > consuming data we never hit hit our limits on network bandwidth. More
> often
> > we hit limits of CPU, Mem limits than the network bandwidth. But this is
> > something to be taken care of by the operator if they want to enable
> tiered
> > storage.
> > Also as mentioned in the KIP/previous threads ,clients requesting older
> > data is very rare and often used as insurance policy . What proposed here
> > does increase bandwidth interms of shipping logsegments to remote but
> > access patterns determines how much we end up reading from remote tier.
> >
> >
> > "- throttling the copying of the data above might be a solution, however,
> > if
> > you have a few TB of data to move to the slower remote tier the risk is
> > that the movement will never complete on time under high Kafka load. Do
> we
> > need a scheduler to use idle time to do the copying?"
> >
> > In our design, we are going to have scheduler in RLM which will
> > periodically copy in-active(rolled-over) log segments.
> > Not sure idle time is easy to calculate and schedule a copy. More over we
> > want to copy the segments as soon as they are available.
> > Throttling something we can take into account and provide options to tune
> > it.
> >
> >
> > "- Have you considered having two options: 1) a slow tier only (e.g., all
> > the data on HDFS) and 2) a fast tier only like Kafka today. This would
> > avoid copying data between the tiers. Customers that can tolerate a
> slower
> > tier with a better price/GB can just choose option (1). Would be good to
> > put in Alternatives considered."
> >
> >  What we want to have is Kafka that is known to the users today with
> local
> > fast disk access and fast data serving layer.  Tiered Storage option
> might
> > not be for everyone and most users who are happy with Kafka today
> shouldn't
> > see changes to their operation because of this KIP.
> >
> > Fundamentally, we believe remote tiered storage data accessed very
> > infrequently. We expect anyone going to read from remote tiered storage
> > expects a slower read response (mostly backfills).
> >
> 

Re: [DISCUSS] KIP-252: Extend ACLs to allow filtering based on ip ranges and subnets

2019-04-03 Thread Colin McCabe
Hi Sönke,

Maybe a reasonable design here would be to not allow creating ACLs based on ip 
ranges and subnets unless the inter-broker protocol setting has been upgraded.  
If an upgrade is done correctly, the IBP should not be upgraded until all the 
brokers have been upgraded, so there shouldn't be older brokers in the cluster 
erroneously giving access to things they shouldn't.  In that case, perhaps we 
can hold off on introducing an ACL versioning scheme for now.

Another thing that is important here is having some way of rejecting malformed 
ip address ranges in the CreateAcls call.  This is probably not too difficult, 
but it should be spelled out.  We could use INVALID_REQUEST as the error code 
for this situation, or maybe create a new one to be more specific.

best,
Colin


On Wed, Apr 3, 2019, at 04:58, Sönke Liebau wrote:
> All,
> 
> as this thread has now been dormant for about three months again I'll am
> willing to consider the attempt at looking at a larger versioning scheme
> for ACLs as failed.
> 
> I am away for a long weekend tomorrow and will start a [VOTE] thread on
> implementing this as is on Monday, as I personally consider the security
> implications of these ACLs in a mixed version cluster quite minimal and
> addressable via the release notes.
> 
> Best,
> Sönke
> 
> On Sat, Mar 16, 2019 at 1:32 PM Sönke Liebau 
> wrote:
> 
> > Just a quick bump, as this has been quiet for a while again.
> >
> > On Tue, Jan 8, 2019 at 12:44 PM Sönke Liebau 
> > wrote:
> >
> >> Hi Colin,
> >>
> >> thanks for your response!
> >>
> >> in theory we could get away without any additional path changes I
> >> think.. I am still somewhat unsure about the best way of addressing
> >> this. I'll outline my current idea and concerns that I still have,
> >> maybe you have some thoughts on it.
> >>
> >> ACLs are currently stored in two places in ZK: /kafka-acl and
> >> /kafka-acl-extended based on whether they make use of prefixes or not.
> >> The reasoning[1] for this is not fundamentally changed by anything we
> >> are discussing here, so I think that split will need to remain.
> >>
> >> ACLs are then stored in the form of a json array:
> >> [zk: 127.0.0.1:2181(CONNECTED) 9] get /kafka-acl/Topic/*
> >>
> >> {"version":1,"acls":[{"principal":"User:sliebau","permissionType":"Allow","operation":"Read","host":"*"},{"principal":"User:sliebau","permissionType":"Allow","operation":"Describe","host":"*"},{"principal":"User:sliebau2","permissionType":"Allow","operation":"Describe","host":"*"},{"principal":"User:sliebau2","permissionType":"Allow","operation":"Read","host":"*"}]}
> >>
> >> What we could do is add a version property to the individual ACL
> >> elements like so:
> >> [
> >>   {
> >> "principal": "User:sliebau",
> >> "permissionType": "Allow",
> >> "operation": "Read",
> >> "host": "*",
> >> "acl_version": "1"
> >>   }
> >> ]
> >>
> >> We define the current state of ACLs as version 0 and the Authorizer
> >> will default a missing "acl_version" element to this value for
> >> backwards compatibility. So there should hopefully be no need to
> >> migrate existing ACLs (concerns notwithstanding, see later).
> >>
> >> Additionally the authorizer will get a max_supported_acl_version
> >> setting which will cause it to ignore any ACLs larger than what is set
> >> here, hence allowing for controlled upgrading similar to the process
> >> using inter broker protocol version. If this happens we should
> >> probably log a warning in case this was unintentional. Maybe even have
> >> a setting that controls whether startup is even possible when not all
> >> ACLs are in effect.
> >>
> >> As I mentioned I have a few concerns, question marks still outstanding on
> >> this:
> >> - This approach would necessitate being backwards compatible with all
> >> earlier versions of ACLs unless we also add a min_acl_version setting
> >> - which would put the topic of ACL migrations back on the agenda.
> >> - Do we need to touch the wire protocol for the admin client for this?
> >> In theory I think not, as the authorizer would write ACLs in the most
> >> current (unless forced down by max_acl_version) version it knows, but
> >> this takes any control over this away from the user.
> >> - This adds json parsing logic to the Authorizer, as it would have to
> >> check the version first, look up the proper ACL schema for that
> >> version and then re-parse the ACL string with that schema - should not
> >> be a real issue if the initial parsing is robust, but strictly
> >> speaking we are parsing something that we don't know the schema for
> >> which might create issues with updates down the line.
> >>
> >> Beyond the practical concerns outlined above there are also some
> >> broader things maybe worth thinking about. The long term goal is to
> >> move away from Zookeeper and other data like consumer group offsets
> >> has already been moved into Kafka topics - is that something that we'd
> >> want to consider for ACLs as 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-04-03 Thread Viktor Somogyi-Vass
Hi Harsha,

Thanks for the answer, makes sense.
In the meantime one edge case popped up in my mind but first let me
summarize what I understand if I interpret your KIP correctly.

So basically whenever the leader RSM copies over a segment to the remote
storage, the leader RLM will append an entry to its remote index files with
the remote position. After this LogManager can delete the local segment.
Parallel to this RLM followers are periodically scanning the remote storage
for files and if they find a new one they update their indices.

Now, will the consumer be able to consume a remote segment if:
- the remote segment is stored in the remote storage, BUT
- the leader broker failed right after this AND
- the follower which is to become a leader didn't scan yet for a new
segment?
Would this result in an OffsetOutOfRangeException or would the failover
halt the consume request until the new leader has the latest information?
As a follow-up question, what are your experiences, does a failover in a
broker causes bigger than usual churn in the consumers? (I'm thinking about
the time required to rebuild remote index files.)

Thanks,
Viktor

On Mon, Apr 1, 2019 at 8:49 PM Harsha  wrote:

> Hi Eno,
>
>   Thanks for the comments. Answers are inline
>
> "Performance & durability
> --
> - would be good to have more discussion on performance implications of
> tiering. Copying the data from the local storage to the remote storage is
> going to be expensive in terms of network bandwidth and will affect
> foreground traffic to Kafka potentially reducing its throughput and
> latency."
>
> Good point. We've run our local tests with 10GigE cards, even though our
> clients bandwidth requirements are high with 1000s of clients producing /
> consuming data we never hit hit our limits on network bandwidth. More often
> we hit limits of CPU, Mem limits than the network bandwidth. But this is
> something to be taken care of by the operator if they want to enable tiered
> storage.
> Also as mentioned in the KIP/previous threads ,clients requesting older
> data is very rare and often used as insurance policy . What proposed here
> does increase bandwidth interms of shipping logsegments to remote but
> access patterns determines how much we end up reading from remote tier.
>
>
> "- throttling the copying of the data above might be a solution, however,
> if
> you have a few TB of data to move to the slower remote tier the risk is
> that the movement will never complete on time under high Kafka load. Do we
> need a scheduler to use idle time to do the copying?"
>
> In our design, we are going to have scheduler in RLM which will
> periodically copy in-active(rolled-over) log segments.
> Not sure idle time is easy to calculate and schedule a copy. More over we
> want to copy the segments as soon as they are available.
> Throttling something we can take into account and provide options to tune
> it.
>
>
> "- Have you considered having two options: 1) a slow tier only (e.g., all
> the data on HDFS) and 2) a fast tier only like Kafka today. This would
> avoid copying data between the tiers. Customers that can tolerate a slower
> tier with a better price/GB can just choose option (1). Would be good to
> put in Alternatives considered."
>
>  What we want to have is Kafka that is known to the users today with local
> fast disk access and fast data serving layer.  Tiered Storage option might
> not be for everyone and most users who are happy with Kafka today shouldn't
> see changes to their operation because of this KIP.
>
> Fundamentally, we believe remote tiered storage data accessed very
> infrequently. We expect anyone going to read from remote tiered storage
> expects a slower read response (mostly backfills).
>
> Making an explicit change like slow/fast tier will only cause more
> confusion and operation complexity that will bring into play. With tiered
> storage , only users who want to use cheaper long-term storage can enable
> it and others can operate the Kafka as its today.  It will give a good
> balance of serving latest reads from local disk almost all the time and
> shipping older data and reading from remote tier when clients needs the
> older data. If necessary, we can re-visit slow/fast-tier options at a later
> point.
>
>
> "Topic configs
> --
> - related to performance but also availability, we need to discuss the
> replication mode for the remote tier. For example, if the Kafka topics used
> to have 3-way replication, will they continue to have 3-way replication on
> the remote tier? Will the user configure that replication? In S3 for
> example, one can choose from different S3 tiers like STD or SIA, but there
> is no direct control over the replication factor like in Kafka."
>
> No. Remote tier is expected to be reliable storage with its own
> replication mechanisms.
>
>
> " how will security and ACLs be configured for the remote tier. E.g., if
> user A does not have 

Re: [VOTE] KIP-443: Return to default segment.ms and segment.index.bytes in Streams repartition topics

2019-04-03 Thread Sönke Liebau
Hi Guozhang,

I've left a comment on the merged pull request, but not sure if you'll get
notified about that since the PR was already merged, so I'll write here as
well.
Setting this to -1 needs to be reflected in the test
shouldAddInternalTopicConfigForRepartitionTopics
as well, as this currently checks for Long.MAX_VALUE [1] and consistently
fails.

Best regards,
Sönke

[1]
https://github.com/apache/kafka/blob/213466b3d4fd21b332c0b6882fea36cf1affef1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java#L647


On Wed, Apr 3, 2019 at 2:07 AM Guozhang Wang  wrote:

> Hello folks,
>
> I'm closing this voting thread now, thanks to all who have provided your
> feedbacks!
>
> Here's a quick tally:
>
> Binding +1: 4 (Damian, Bill, Manikumar, Guozhang)
> Non-binding +1: (John, Mickael).
>
>
> Guozhang
>
> On Fri, Mar 29, 2019 at 11:32 AM Guozhang Wang  wrote:
>
> > Ah I see, my bad :) Yes that was the documented value in `TopicConfig`,
> > and I agree we should just change that as well.
> >
> > Will update the KIP.
> >
> >
> >
> > On Fri, Mar 29, 2019 at 11:27 AM Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> I know the KIP is about segments configuration but I'm talking about
> >> retention.ms which is also explicitly set on repartition topics
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java#L39
> >> Streams is setting it to Long.MAX_VALUE, but -1 is the "documented"
> >> way to disable the time limit. That's why I said "for consistency" as
> >> in practice it's not going to change anything.
> >>
> >> On Fri, Mar 29, 2019 at 5:09 PM Guozhang Wang 
> wrote:
> >> >
> >> > Hello Mickael,
> >> >
> >> > segment.ms default value in TopicConfig is 7 days, I think this is a
> >> > sufficient default value. Do you have any motivations to set it to -1?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Fri, Mar 29, 2019 at 9:42 AM Mickael Maison <
> >> mickael.mai...@gmail.com>
> >> > wrote:
> >> >
> >> > > +1 (non binding)
> >> > > For consistency, should we also set retention.ms to -1 instead of
> >> > > Long.MAX_VALUE?
> >> > >
> >> > > On Fri, Mar 29, 2019 at 3:59 PM Manikumar <
> manikumar.re...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > +1 (binding)
> >> > > >
> >> > > > Thanks for the KIP.
> >> > > >
> >> > > > On Fri, Mar 29, 2019 at 9:04 PM Damian Guy 
> >> wrote:
> >> > > >
> >> > > > > +1
> >> > > > >
> >> > > > > On Fri, 29 Mar 2019 at 01:59, John Roesler 
> >> wrote:
> >> > > > >
> >> > > > > > +1 (nonbinding) from me.
> >> > > > > >
> >> > > > > > On Thu, Mar 28, 2019 at 7:08 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hello folks,
> >> > > > > > >
> >> > > > > > > I'd like to directly start a voting thread on this simple
> KIP
> >> to
> >> > > change
> >> > > > > > the
> >> > > > > > > default override values for repartition topics:
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics
> >> > > > > > >
> >> > > > > > > The related PR can be found here as well:
> >> > > > > > > https://github.com/apache/kafka/pull/6511
> >> > > > > > >
> >> > > > > > > If you have any thoughts or feedbacks, they are more than
> >> welcomed
> >> > > as
> >> > > > > > well.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > -- Guozhang
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Re: [DISCUSS] KIP-252: Extend ACLs to allow filtering based on ip ranges and subnets

2019-04-03 Thread Sönke Liebau
All,

as this thread has now been dormant for about three months again I'll am
willing to consider the attempt at looking at a larger versioning scheme
for ACLs as failed.

I am away for a long weekend tomorrow and will start a [VOTE] thread on
implementing this as is on Monday, as I personally consider the security
implications of these ACLs in a mixed version cluster quite minimal and
addressable via the release notes.

Best,
Sönke

On Sat, Mar 16, 2019 at 1:32 PM Sönke Liebau 
wrote:

> Just a quick bump, as this has been quiet for a while again.
>
> On Tue, Jan 8, 2019 at 12:44 PM Sönke Liebau 
> wrote:
>
>> Hi Colin,
>>
>> thanks for your response!
>>
>> in theory we could get away without any additional path changes I
>> think.. I am still somewhat unsure about the best way of addressing
>> this. I'll outline my current idea and concerns that I still have,
>> maybe you have some thoughts on it.
>>
>> ACLs are currently stored in two places in ZK: /kafka-acl and
>> /kafka-acl-extended based on whether they make use of prefixes or not.
>> The reasoning[1] for this is not fundamentally changed by anything we
>> are discussing here, so I think that split will need to remain.
>>
>> ACLs are then stored in the form of a json array:
>> [zk: 127.0.0.1:2181(CONNECTED) 9] get /kafka-acl/Topic/*
>>
>> {"version":1,"acls":[{"principal":"User:sliebau","permissionType":"Allow","operation":"Read","host":"*"},{"principal":"User:sliebau","permissionType":"Allow","operation":"Describe","host":"*"},{"principal":"User:sliebau2","permissionType":"Allow","operation":"Describe","host":"*"},{"principal":"User:sliebau2","permissionType":"Allow","operation":"Read","host":"*"}]}
>>
>> What we could do is add a version property to the individual ACL
>> elements like so:
>> [
>>   {
>> "principal": "User:sliebau",
>> "permissionType": "Allow",
>> "operation": "Read",
>> "host": "*",
>> "acl_version": "1"
>>   }
>> ]
>>
>> We define the current state of ACLs as version 0 and the Authorizer
>> will default a missing "acl_version" element to this value for
>> backwards compatibility. So there should hopefully be no need to
>> migrate existing ACLs (concerns notwithstanding, see later).
>>
>> Additionally the authorizer will get a max_supported_acl_version
>> setting which will cause it to ignore any ACLs larger than what is set
>> here, hence allowing for controlled upgrading similar to the process
>> using inter broker protocol version. If this happens we should
>> probably log a warning in case this was unintentional. Maybe even have
>> a setting that controls whether startup is even possible when not all
>> ACLs are in effect.
>>
>> As I mentioned I have a few concerns, question marks still outstanding on
>> this:
>> - This approach would necessitate being backwards compatible with all
>> earlier versions of ACLs unless we also add a min_acl_version setting
>> - which would put the topic of ACL migrations back on the agenda.
>> - Do we need to touch the wire protocol for the admin client for this?
>> In theory I think not, as the authorizer would write ACLs in the most
>> current (unless forced down by max_acl_version) version it knows, but
>> this takes any control over this away from the user.
>> - This adds json parsing logic to the Authorizer, as it would have to
>> check the version first, look up the proper ACL schema for that
>> version and then re-parse the ACL string with that schema - should not
>> be a real issue if the initial parsing is robust, but strictly
>> speaking we are parsing something that we don't know the schema for
>> which might create issues with updates down the line.
>>
>> Beyond the practical concerns outlined above there are also some
>> broader things maybe worth thinking about. The long term goal is to
>> move away from Zookeeper and other data like consumer group offsets
>> has already been moved into Kafka topics - is that something that we'd
>> want to consider for ACLs as well? With the current storage model we'd
>> need more than one topic for this to cleanly separate resources and
>> prefixed ACLs - if we consider pursuing this option it might be a
>> chance for a "larger" change to the format which introduces versioning
>> and allows storing everything in one compacted topic.
>>
>> Any thoughts on this?
>>
>> Best regards,
>> Sönke
>>
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
>>
>>
>> On Sat, Dec 22, 2018 at 5:51 AM Colin McCabe  wrote:
>> >
>> > Hi Sönke,
>> >
>> > One path forward would be to forbid the new ACL types from being
>> created until the inter-broker protocol had been upgraded.  We'd also have
>> to figure out how the new ACLs were stored in ZooKeeper.  There are a bunch
>> of proposals in this thread that could work for that-- I really hope we
>> don't keep changing the ZK path each time there is a version bump.
>> >
>> > best,
>> > Colin
>> >
>> >
>> > On Thu, Nov 29, 2018, at 14:25, Sönke 

[jira] [Created] (KAFKA-8186) Apply for translation of the Chinese version, I hope to get authorization!

2019-04-03 Thread Yuan Yifan (JIRA)
Yuan Yifan created KAFKA-8186:
-

 Summary: Apply for translation of the Chinese version, I hope to 
get authorization! 
 Key: KAFKA-8186
 URL: https://issues.apache.org/jira/browse/KAFKA-8186
 Project: Kafka
  Issue Type: Wish
Reporter: Yuan Yifan


Hello everyone, we are [ApacheCN|https://www.apachecn.org/], an open-source 
community in China, focusing on Big Data and AI.

Recently, we have been making progress on translating Kafka documents.

 - [Source Of Document|https://github.com/apachecn/kafka-doc-zh]
 - [Document Preview|http://kafka.apachecn.org/]

There are several reasons:
 *1. The English level of many Chinese users is not very good.*
 *2. Network problems, you know (China's magic network)!*
 *3. Online blogs are very messy.*

We are very willing to do some Chinese localization for your project. If 
possible, please give us some authorization and create a link on your project 
homepage.

Yifan Yuan from Apache CN

You may contact me by mail [tsingjyuj...@163.com|mailto:tsingjyuj...@163.com] 
for more details



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3508

2019-04-03 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7190: KIP-443; Remove streams overrides on repartition topics

--
[...truncated 2.35 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

> Task :examples:test NO-SOURCE

> Task :streams:examples:test

org.apache.kafka.streams.examples.wordcount.WordCountProcessorTest > test 
STARTED

org.apache.kafka.streams.examples.wordcount.WordCountProcessorTest > test PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed