[jira] [Resolved] (KAFKA-10547) Add topic IDs to MetadataResponse

2020-12-18 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10547.

Resolution: Done

> Add topic IDs to MetadataResponse
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #301

2020-12-18 Thread Apache Jenkins Server
See 


Changes:

[github] Add ConfigurableProducerSpec to Trogdor for improved E2E latency 
tracking. (#9736)

[github] KAFKA-10547; Add TopicId in MetadataResponse (#9622)

[github] KAFKA-10545: Create topic IDs and propagate to brokers (#9626)


--
[...truncated 3.48 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #347

2020-12-18 Thread Apache Jenkins Server
See 


Changes:

[github] Add ConfigurableProducerSpec to Trogdor for improved E2E latency 
tracking. (#9736)

[github] KAFKA-10547; Add TopicId in MetadataResponse (#9622)

[github] KAFKA-10545: Create topic IDs and propagate to brokers (#9626)


--
[...truncated 3.51 MB...]

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@7bef19eb, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@3648fa5, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@3648fa5, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@728a4406, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@728a4406, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@8195b6c, 
timestamped = false, caching = true, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@8195b6c, 
timestamped = false, caching = true, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@bba3bc5, 
timestamped = false, caching = true, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@bba3bc5, 
timestamped = false, caching = true, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@2c8494b0, 
timestamped = false, caching = true, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@2c8494b0, 
timestamped = false, caching = true, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@c8e5f52, 
timestamped = false, caching = true, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@c8e5f52, 
timestamped = false, caching = true, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@4e8ea794, 
timestamped = false, caching = false, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@4e8ea794, 
timestamped = false, caching = false, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@14b0c993, 
timestamped = false, caching = false, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@14b0c993, 
timestamped = false, caching = false, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@1c19eea1, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@1c19eea1, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 

Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #327

2020-12-18 Thread Apache Jenkins Server
See 


Changes:

[github] Add ConfigurableProducerSpec to Trogdor for improved E2E latency 
tracking. (#9736)

[github] KAFKA-10547; Add TopicId in MetadataResponse (#9622)

[github] KAFKA-10545: Create topic IDs and propagate to brokers (#9626)


--
[...truncated 3.51 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED


[jira] [Created] (KAFKA-10870) Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup

2020-12-18 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10870:
---

 Summary: Consumer should handle REBALANCE_IN_PROGRESS from 
JoinGroup
 Key: KAFKA-10870
 URL: https://issues.apache.org/jira/browse/KAFKA-10870
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


We hit a timeout when persisting group metadata to the __consumer_offsets topic:
{code}
[2020-12-18 18:06:07,889] DEBUG Created a new incremental FetchContext for 
session id 5832, epoch 53: added 0 partition(s), updated 0 partition(s), 
removed 0 partition(s) (kafka.server.FetchManager)
[2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata from 
group test_group_id with generation 1 failed when appending to log due to 
org.apache.kafka.common.errors.TimeoutException 
(kafka.coordinator.group.GroupMetadataManager)
[2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist metadata 
for group test_group_id: The group is rebalancing, so a rejoin is needed. 
(kafka.coordinator.group.GroupCoordinator)
{code}

This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the 
JoinGroup:

{code}
[2020-12-18 18:06:08,211] INFO Completed 
request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, 
clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- 
{group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0
 lim=26 
cap=26],_tagged_fields={}}],_tagged_fields={}},response:{throttle_time_ms=0,error_code=27,generation_id=1,protocol_type=consumer,protocol_name=range,leader=test_group_id-instance-2-32e72316-2c3f-40d6-bc34-8ec23d633d34,member_id=,members=[],_tagged_fields={}}
 from connection 
172.31.46.222:9092-172.31.44.169:41310-6;totalTime:5014.825,requestQueueTime:0.193,localTime:11.575,remoteTime:5002.195,throttleTime:0.66,responseQueueTime:0.105,sendTime:0.094,sendIoTime:0.038,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=apache-kafka-java,
 softwareVersion=5.5.3-ce) (kafka.request.logger)  
{code}

The consumer has no logic to handle REBALANCE_IN_PROGRESS from JoinGroup.

{code}
[2020-12-18 18:06:08,210] ERROR [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Attempt to join group failed due to unexpected error
: The group is rebalancing, so a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] INFO [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Join group failed with org.apache.kafka.common.KafkaE
xception: Unexpected error in join group response: The group is rebalancing, so 
a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] ERROR Error during processing, terminating consumer 
process:  (org.apache.kafka.tools.VerifiableConsumer)
org.apache.kafka.common.KafkaException: Unexpected error in join group 
response: The group is rebalancing, so a rejoin is needed.
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:653)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:574)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10545) Create topic IDs and propagate to brokers

2020-12-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10545.

Resolution: Done

> Create topic IDs and propagate to brokers
> -
>
> Key: KAFKA-10545
> URL: https://issues.apache.org/jira/browse/KAFKA-10545
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> First step for KIP-516
> The goals are:
>  * Create and store topic IDs in a ZK Node and controller memory.
>  * Propagate topic ID to brokers with updated LeaderAndIsrRequest, 
> UpdateMetadata
>  * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #346

2020-12-18 Thread Apache Jenkins Server
See 


Changes:

[github] KAKFA-10619: Idempotent producer will get authorized once it has a 
WRITE access to at least one topic (KIP-679) (#9485)


--
[...truncated 7.01 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED


Jenkins build is back to normal : Kafka » kafka-trunk-jdk11 #326

2020-12-18 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-698: Add Explicit User Initialization of Broker-side State to Kafka Streams

2020-12-18 Thread Matthias J. Sax
Thanks for the KIP, Bruno.

(1) To add my 2ct to the previous discussion: While I am a big advocate
for simplicity (what what always a strength of Kafka Streams), I think
that making `init()` mandatory would be ok, because for people that get
started, it is just one more line of code. In contrast to the example
from Sophie about input topics: it's more work to actually create input
topic and thus, I don't think it would be too much of a problem.

Nevertheless, after 4 years of having apps in production with Kafka
Streams, it also seems not to be too big of a risk for many use cases
and thus using auto-topic creating seems to be a working solution for
many use cases.

Thus, I tend to think we should keep both long term.


(2) About `MissingInternalTopicException`: should we add a getter method
to expose which topics are missing and/or have incorrect number of
partitions (based on Guozhang's comment on the voting thread)?


(3) Should we allow people to change configs of those topic if the
already exist using `init()`? Cf:

 - https://issues.apache.org/jira/browse/KAFKA-7803
 - https://issues.apache.org/jira/browse/KAFKA-7591

This might be out-of-scope for the KIP, but I thought I bring it up
anyway. Thoughts?


(4) I agree that adding an overload that takes a timeout is a good idea.


(5) Should `init()` throw an exception if _any_ internal topic exists
already? You propose that it would not throw if _all_ internal topic
exist but I think we should throw for this case, too? Otherwise, user
could write code to calls `init()` unconditionally, and thus the guard
we try to establish would be void (as it could happen that _all_
internal topics got deleted, especially if there is only a single one;
and this case would not be detected).

Nit: can we document the exception type in the public API change code
snippet?

(5b) The comment on `initParameters()` is confusing:

> // specifies to disable all setup of internal topics

It sound like if internal topic are not created at all.

(5c) Why do we need a getter on `InitParameters`?


(6) Migration:

> Migrating from the current behavior to INTERNAL_TOPIC_SETUP set to 
> MANUAL_SETUP can be done without any specific migration plan. Users need to 
> set INTERNAL_TOPIC_SETUP to MANUAL_SETUP.  

Add: "and need to change their code to call `init()` accordingly."



-Matthias


On 12/11/20 6:51 AM, John Roesler wrote:
> Thanks Bruno,
> 
> It’s your decision. I was thinking that it’s appropriate to mention such 
> discussed and rejected/deferred items under “Rejected Alternatives” so that 
> future generations will know whether we even thought of it and why we didn’t 
> do it right now.
> 
> Thanks,
> John
> 
> On Fri, Dec 11, 2020, at 04:00, Bruno Cadonna wrote:
>> Thanks Sophie for the feedback!
>>
>> I agree with you on not logging a warning for a decision, we haven't 
>> taken and on logging a warning for possible data loss with automatic 
>> initialization. I would say this is a implementation detail that we can 
>> further discuss on the PR.
>>
>> Regarding the parameter class, I followed the grammar here:
>>
>> https://cwiki.apache.org/confluence/x/Lw6dC
>>
>> We've already used it for other KIPs and I thought we agreed to use it 
>> for new parameter classes.
>>
>>
>> John,
>>
>> I am in favor of not mentioning a possible future decision about 
>> dismissing automatic initialization in this KIP. I think, we should 
>> discuss and decide on the dismissal with a separate KIP.
>>
>> Best,
>> Bruno
>>
>>
>> On 10.12.20 20:48, Sophie Blee-Goldman wrote:
>>> Hey John,
>>>
>>> I think we should avoid logging a warning that implies we've committed
>>> to changing a default unless we've absolutely committed to it, which it
>>> sounds like we have not (fwiw I'm also on the fence, but leaning towards
>>> leaving it automatic -- just think of how many people already forget to
>>> create their source topics before startup and struggle with that). This is
>>> probably part of a larger discussion on whether to default to OOTB-friendly
>>> or production-ready settings, which should probably be considered
>>> holistically
>>> rather than on a case-by-case basis.
>>>
>>> That said, I'm totally down with logging a warning that data loss is
>>> possible
>>> when using automatic initialization, if that's what you meant.
>>>
>>> Bruno,
>>>
>>> Thanks for the KIP, it looks good in general but I'm wondering if we can
>>> make
>>> the InitParameters API a bit more aligned to the config/parameter classes
>>> used
>>> throughout Streams (eg Materialized).
>>>
>>> For example something like
>>>
>>> public class Initialized {
>>>
>>>  public static withSetupInternalTopicsIfIncompleteEnabled();
>>>  public static withSetupInternalTopicsIfIncompleteDisabled();
>>>  // we also don't tend to have getters for these kind of classes,
>>> but maybe we should start :)
>>> }
>>>
>>>
>>> On Thu, Dec 10, 2020 at 9:33 AM John Roesler  wrote:
>>>
 Thanks, Bruno,

 I 

Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread Ismael Juma
Thanks for the KIP. I agree with keeping it simple in the first iteration.
If we can include the authorized operations clean-up without too much
effort, that would be great. But if it increases the scope a lot, I'd punt
it to the future.

Ismael

On Mon, Dec 14, 2020 at 8:10 AM David Jacot  wrote:

> Hi all,
>
> I'd like to propose a small KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
>
> Please let me know what you think.
>
> Best,
> David
>


Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread Gwen Shapira
On Fri, Dec 18, 2020, 12:48 PM Colin McCabe  wrote:

> On Fri, Dec 18, 2020, at 08:32, Gwen Shapira wrote:
> > Agree. Once we have the basic API we can decide what belongs and what
> > doesn't. I remember requests for broker status, version, and various
> > other things. Considering all those options together will be too much
> > at this point.
> >
>
> Hi Gwen,
>
> I agree that there are a lot of potential improvements, and we want to
> avoid putting too much in this KIP.  But I still think it would be helpful
> to have one or two new user-visible things in this KIP, just to demonstrate
> why we need a new RPC.  Or if that's too much, can we identify a few things
> in a "future work" section?  I think it adds a lot to the motivation for
> this KIP.  After all, if you don't think we're going to add anything more
> to describeCluster, there is not a strong case for a new RPC.
>

I think the complexity of MetadataResponse alone justifies a cleaner API,
but if adding JMX port (or broker version, or broker status) is a must-have
in your mind, I have no objection - I have few use-cases for the
broker-status response.


> > One thing that may be worth considering is whether you want to include
> > not just the controller ID but also its epoch. This will allow
> > resolving cases where brokers respond with outdated information. I
> > haven't thought it through, so maybe it doesn't make sense here, but I
> > was wondering if this was considered.
>
> I think we should hold off on adding more epochs right now until we have a
> more general solution.  With KIP-500 we can potentially put an epoch on the
> whole metadata response, which would allow us to have read-after-write
> consistency for metadata-- something people have wanted for a while.
>

Oh, this is wonderful news! We can add the metadata epoch separately (I did
not suggest a new epoch, I suggested publishing the existing broker and
controller epochs in the response, but we can add this later... as I just
argued above).


>
> best,
> Colin
>
>
> >
> > On Fri, Dec 18, 2020 at 4:51 AM David Jacot  wrote:
> > >
> > > Hi Ryan,
> > >
> > > Thanks for your feedback. That's an interesting idea but that raises
> more
> > > questions. At the moment, `describeCluster` only requires to be
> > > authenticated
> > > (if authentication is enabled) to get the basic information about the
> > > cluster.
> > > If we add the JMX port, is it something that we would provide without
> > > requiring
> > > more permissions? I am not sure about this.
> > >
> > > I lean towards keeping this KIP focused on introducing the new API and
> to
> > > add new information with separate KIPs. There might be more information
> > > that we want to add as part of KIP-500.
> > >
> > > I will be happy to hear what other members of the community think about
> > > this.
> > >
> > > Best,
> > > David
> > >
> > > On Thu, Dec 17, 2020 at 5:57 AM Colin McCabe 
> wrote:
> > >
> > > > Hi David,
> > > >
> > > > This seems reasonable.  It would be nice to have an API specifically
> for
> > > > describeCluster, so that we could extend this API without adding more
> > > > fields to the already large MetadataRequest.
> > > >
> > > > As you mention in the KIP, KIP-700 would allow us to deprecate
> > > > MetadataRequest#ClusterAuthorizedOperations.  So it seems like this
> KIP
> > > > should specify a new version of MetadataRequest where the related
> fields
> > > > are absent, right?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Mon, Dec 14, 2020, at 08:10, David Jacot wrote:
> > > > > Hi all,
> > > > >
> > > > > I'd like to propose a small KIP:
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
> > > > >
> > > > > Please let me know what you think.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > >
> >
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>


Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread Colin McCabe
On Fri, Dec 18, 2020, at 08:32, Gwen Shapira wrote:
> Agree. Once we have the basic API we can decide what belongs and what
> doesn't. I remember requests for broker status, version, and various
> other things. Considering all those options together will be too much
> at this point.
> 

Hi Gwen,

I agree that there are a lot of potential improvements, and we want to avoid 
putting too much in this KIP.  But I still think it would be helpful to have 
one or two new user-visible things in this KIP, just to demonstrate why we need 
a new RPC.  Or if that's too much, can we identify a few things in a "future 
work" section?  I think it adds a lot to the motivation for this KIP.  After 
all, if you don't think we're going to add anything more to describeCluster, 
there is not a strong case for a new RPC.

> One thing that may be worth considering is whether you want to include
> not just the controller ID but also its epoch. This will allow
> resolving cases where brokers respond with outdated information. I
> haven't thought it through, so maybe it doesn't make sense here, but I
> was wondering if this was considered.

I think we should hold off on adding more epochs right now until we have a more 
general solution.  With KIP-500 we can potentially put an epoch on the whole 
metadata response, which would allow us to have read-after-write consistency 
for metadata-- something people have wanted for a while.

best,
Colin


> 
> On Fri, Dec 18, 2020 at 4:51 AM David Jacot  wrote:
> >
> > Hi Ryan,
> >
> > Thanks for your feedback. That's an interesting idea but that raises more
> > questions. At the moment, `describeCluster` only requires to be
> > authenticated
> > (if authentication is enabled) to get the basic information about the
> > cluster.
> > If we add the JMX port, is it something that we would provide without
> > requiring
> > more permissions? I am not sure about this.
> >
> > I lean towards keeping this KIP focused on introducing the new API and to
> > add new information with separate KIPs. There might be more information
> > that we want to add as part of KIP-500.
> >
> > I will be happy to hear what other members of the community think about
> > this.
> >
> > Best,
> > David
> >
> > On Thu, Dec 17, 2020 at 5:57 AM Colin McCabe  wrote:
> >
> > > Hi David,
> > >
> > > This seems reasonable.  It would be nice to have an API specifically for
> > > describeCluster, so that we could extend this API without adding more
> > > fields to the already large MetadataRequest.
> > >
> > > As you mention in the KIP, KIP-700 would allow us to deprecate
> > > MetadataRequest#ClusterAuthorizedOperations.  So it seems like this KIP
> > > should specify a new version of MetadataRequest where the related fields
> > > are absent, right?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Mon, Dec 14, 2020, at 08:10, David Jacot wrote:
> > > > Hi all,
> > > >
> > > > I'd like to propose a small KIP:
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
> > > >
> > > > Please let me know what you think.
> > > >
> > > > Best,
> > > > David
> > > >
> > >
> 
> 
> 
> -- 
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [VOTE] voting on KIP-631: the quorum-based Kafka controller

2020-12-18 Thread Colin McCabe
Hi all,

I'm going to close the vote in a few hours.  Thanks to everyone who reviewed 
and voted.

best,
Colin


On Fri, Dec 18, 2020, at 10:08, Jun Rao wrote:
> Thanks, Colin. +1
> 
> Jun
> 
> On Thu, Dec 17, 2020 at 2:24 AM David Jacot  wrote:
> 
> > Thanks for driving this KIP, Colin. The KIP is really well written. This is
> > so exciting!
> >
> > +1 (binding)
> >
> > Best,
> > David
> >
> > On Wed, Dec 16, 2020 at 11:51 PM Colin McCabe  wrote:
> >
> > > On Wed, Dec 16, 2020, at 13:08, Ismael Juma wrote:
> > > > Thanks for all the work on the KIP. Given the magnitude of the KIP, I
> > > > expect that some tweaks will be made as the code is implemented,
> > reviewed
> > > > and tested. I'm overall +1 (binding).
> > > >
> > >
> > > Thanks, Ismael.
> > >
> > > > A few comments below:
> > > > 1. It's a bit weird for kafka-storage to output a random uuid. Would it
> > > be
> > > > better to have a dedicated command for that?
> > >
> > > I'm not sure.  The nice thing about putting it in kafka-storage.sh is
> > that
> > > it's there when you need it.  I also think that having subcommands, like
> > we
> > > do here, really reduces the "clutter" that we have in some other
> > > command-line tools.  When you get help about the "info" subcommand, you
> > > don't see flags for any other subcommand, for example.  I guess we can
> > move
> > > this later if it seems more intuitive though.
> > >
> > > > Also, since we use base64
> > > > encoded uuids nearly everywhere (including cluster and topic ids), it
> > > would
> > > > be good to follow that pattern instead of the less compact
> > > > "51380268-1036-410d-a8fc-fb3b55f48033".
> > >
> > > Good idea.  I have updated this to use base64 encoded UUIDs.
> > >
> > > > 2. This is a nit, but I think it would be better to talk about built-in
> > > > quorum mode instead of KIP-500 mode. It's more self descriptive than a
> > > KIP
> > > > reference.
> > >
> > > I do like the sound of "quorum mode."  I guess the main question is, if
> > we
> > > later implement raft quorums for regular topics, would that nomenclature
> > be
> > > confusing?  I guess we could talk about "metadata quorum mode" to avoid
> > > confusion.  Hmm.
> > >
> > > > 3. Did we consider using `session` (like the group coordinator) instead
> > > of
> > > > `regsitration` in `broker.registration.timeout.ms`?
> > >
> > > Hmm, broker.session.timeout.ms does sound better.  I changed it to that.
> > >
> > > > 4. The flat id space for the controller and broker while requiring a
> > > > different id in embedded mode seems a bit unintuitive. Are there any
> > > other
> > > > systems that do this? I know we covered some of the reasons in the
> > > "Shared
> > > > IDs between Multiple Nodes" rejected alternatives section, but it
> > didn't
> > > > seem totally convincing to me.
> > >
> > > One of my concerns here is that using separate ID spaces for controllers
> > > versus brokers would potentially lead to metrics or logging collisions.
> > We
> > > can take a look at that again once the implementation is further along, I
> > > guess, to see how often that is a problem in practice.
> > >
> > > > 5. With regards to the controller process listening on a separate port,
> > > it
> > > > may be worth adding a sentence about the forwarding KIP as that is a
> > main
> > > > reason why the controller port doesn't need to be accessible.
> > >
> > > Good idea... I added a short reference to KIP-590 in the "Networking"
> > > section.
> > >
> > > > 6. The internal topic seems to be called @metadata. I'm personally not
> > > > convinced about the usage of @ in this way. I think I would go with the
> > > > same convention we have used for other existing internal topics.
> > >
> > > I knew this one would be controversial :)
> > >
> > > I guess the main argument here is that using @ avoids collisions with any
> > > existing topic.  Leading underscores, even double underscores, can be
> > used
> > > by users to create new topics, but an "at sign" cannot  It would be nice
> > to
> > > have a namespace for system topics that we knew nobody else could break
> > > into.
> > >
> > > > 7. We talk about the metadata.format feature flag. Is this intended to
> > > > allow for single roll upgrades?
> > > > 8. Could the incarnation id be called registration id? Or is there a
> > > reason
> > > > why this would be a bad name?
> > >
> > > I liked "incarnation id" because it expresses the idea that each new
> > > incarnation of the broker gets a different one.  I think "registration
> > id"
> > > might be confused with "the broker id is the ID we're registering."
> > >
> > > > 9. Could `CurMetadataOffset` be called `CurrentMetadataOffset` for
> > > > `BrokerRegistrationRequest`? The abbreviation here doesn't seem to help
> > > > much and makes things slightly less readable. It would also make it
> > > > consistent with `BrokerHeartbeatRequest`.
> > >
> > > Yeah, the abbreviated name is inconsistent.  I will change it to
> > > 

[jira] [Created] (KAFKA-10869) Gate topic IDs behind IBP 2.8

2020-12-18 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10869:
--

 Summary: Gate topic IDs behind IBP 2.8
 Key: KAFKA-10869
 URL: https://issues.apache.org/jira/browse/KAFKA-10869
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


We want to do this so we don't lose topic IDs upon downgrades. If we downgrade 
and write to topic node in ZK, the topic ID will be lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Request to be added as a contributor in Kafka JIRA board

2020-12-18 Thread Matthias J. Sax
Done.

On 12/17/20 11:23 PM, Prathibha Muralidharan wrote:
> Hi,
> 
> Please add me as a contributor in Kafka JIRA board and Confluence Wiki. My
> JIRA Id is Prathibha. My Confluence ID is prathibha.
> 
> Thanks,
> 
> Prathibha
> 
> -
> Prathibha Muralidharan
> SWE @ Connect Team
> Confluent
> Mountain View, CA
> -
> 


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread Matthias J. Sax
Sorry that I am late to the game.

+1 (binding)

We always knew about this gap when we did KIP-353, and thus, I am glad
that we finally address it.


-Matthias

On 12/18/20 10:16 AM, John Roesler wrote:
> Thanks for your question, Ismael,
> 
> Are you concerned about the consumer performance, streams performance or both?
> 
> On the consumer side, this is only creating one extra struct for each 
> response partition to represent the metadata that we already have access to 
> internally. I don’t think this would have a measurable performance impact.
> 
> On the streams side, I would definitely like to ensure that performance 
> doesn’t decrease for users. I ran our internal benchmarks on my POC branch 
> and found that the measured throughput across all operations is within the 
> 99% confidence interval of the baseline performance of trunk. I also deployed 
> our internal soak test from my POC branch, which includes a join operation, 
> and I observe that the throughput of that soak cluster is identical to the 
> soak for trunk.
> 
> This result is to be expected, since the semantics improve the here would 
> only kick in for Join/Merge operations where Streams is processing faster 
> than it can fetch some partitions on average. I would expect Streams to catch 
> up to the fetches occasionally, but not on average. 
> 
> It’s also worth noting that we have seen increasing numbers of users 
> complaining of incorrect join results due to the current implementation. Even 
> if the new implementation showed a modest drop in performance, I would 
> advocate for correct results over top performance by default.
> 
> Finally, to assuage any lingering concerns, there is a configuration 
> available to completely disable the new semantics proposed here and revert to 
> the prior behavior. 
> 
> These details seem worth mentioning in the KIP. I’ll update the document 
> shortly. 
> 
> Thanks again,
> John
> 
> On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
>> Hi John,
>>
>> It would be good to make sure these changes have no measurable performance
>> impact for the use cases that don't need it. Have we given this some
>> thought? And what would be the perf testing strategy to verify this?
>>
>> Ismael
>>
>> On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:
>>
>>> Thanks for the votes and reviews, all. I'll wait for a
>>> response from Jason before closing the vote, since he asked
>>> for clarification.
>>>
>>> The present count is:
>>> * 3 binding +1 (Guozhang, Bill, and myself)
>>> * 2 non-binding +1 (Bruno and Walker)
>>>
>>> I have updated the KIP document in response to the requests
>>> for clarification:
>>> 1) The new metadata() map actually just contains immutable
>>> Metadata objects representing the metadata received in the
>>> last round of fetch responses, so I decided to stick with
>>> `receivedMetadata`, as that is an accurate representation of
>>> the timestamp's meaning.
>>>
>>> 2) I added a javadoc clarifying that the metadata partitions
>>> may be a superset of the data partitions in the same
>>> ConsumerRecords
>>>
>>> 3) I confirmed that the position we are returning is the
>>> next offset to fetch after the current returned records.
>>> This is equivalent to the "current position" of the consumer
>>> after the call to poll() that returns this ConsumerRecords
>>> object
>>>
>>> 4) (Jason's question about whether we include metadata for
>>> all partitions or just the latest fetch responses) I've
>>> clarified the javadoc to state that the metadata is only
>>> what was included in the latest fetches.
>>>
>>> Thanks,
>>> -John
>>>
>>>
>>> On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
 Hi John,

 I've made a pass over the KIP and I think it will be a good addition.

 Modulo Jason's question, I'm a +1 (binding).

 Thanks,
 Bill

 On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
>>> wrote:

> Hi John,
>
> Just one question. It wasn't very clear to me exactly when the metadata
> would be returned in `ConsumerRecords`. Would we /always/ include the
> metadata for all partitions that are assigned, or would it be based on
>>> the
> latest fetches?
>
> Thanks,
> Jason
>
> On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
>>> wrote:
>
>> Thanks, Guozhang!
>>
>> All of your feedback sounds good to me. I’ll update the KIP when I am
> able.
>>
>> 3) I believe it is the position after the fetch, but I will confirm.
>>> I
>> think omitting position may render beginning and end offsets useless
>>> as
>> well, which leaves only lag. That would be fine with me, but it also
> seems
>> nice to supply this extra metadata since it is well defined and
>>> probably
>> handy for others. Therefore, I’d go the route of specifying the exact
>> semantics and keeping it.
>>
>> Thanks for the review,
>> John
>>
>> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang 

[jira] [Created] (KAFKA-10868) Avoid double wrapping KafkaException

2020-12-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10868:
---

 Summary: Avoid double wrapping KafkaException
 Key: KAFKA-10868
 URL: https://issues.apache.org/jira/browse/KAFKA-10868
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread John Roesler
Thanks for your question, Ismael,

Are you concerned about the consumer performance, streams performance or both?

On the consumer side, this is only creating one extra struct for each response 
partition to represent the metadata that we already have access to internally. 
I don’t think this would have a measurable performance impact.

On the streams side, I would definitely like to ensure that performance doesn’t 
decrease for users. I ran our internal benchmarks on my POC branch and found 
that the measured throughput across all operations is within the 99% confidence 
interval of the baseline performance of trunk. I also deployed our internal 
soak test from my POC branch, which includes a join operation, and I observe 
that the throughput of that soak cluster is identical to the soak for trunk.

This result is to be expected, since the semantics improve the here would only 
kick in for Join/Merge operations where Streams is processing faster than it 
can fetch some partitions on average. I would expect Streams to catch up to the 
fetches occasionally, but not on average. 

It’s also worth noting that we have seen increasing numbers of users 
complaining of incorrect join results due to the current implementation. Even 
if the new implementation showed a modest drop in performance, I would advocate 
for correct results over top performance by default.

Finally, to assuage any lingering concerns, there is a configuration available 
to completely disable the new semantics proposed here and revert to the prior 
behavior. 

These details seem worth mentioning in the KIP. I’ll update the document 
shortly. 

Thanks again,
John

On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
> Hi John,
> 
> It would be good to make sure these changes have no measurable performance
> impact for the use cases that don't need it. Have we given this some
> thought? And what would be the perf testing strategy to verify this?
> 
> Ismael
> 
> On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:
> 
> > Thanks for the votes and reviews, all. I'll wait for a
> > response from Jason before closing the vote, since he asked
> > for clarification.
> >
> > The present count is:
> > * 3 binding +1 (Guozhang, Bill, and myself)
> > * 2 non-binding +1 (Bruno and Walker)
> >
> > I have updated the KIP document in response to the requests
> > for clarification:
> > 1) The new metadata() map actually just contains immutable
> > Metadata objects representing the metadata received in the
> > last round of fetch responses, so I decided to stick with
> > `receivedMetadata`, as that is an accurate representation of
> > the timestamp's meaning.
> >
> > 2) I added a javadoc clarifying that the metadata partitions
> > may be a superset of the data partitions in the same
> > ConsumerRecords
> >
> > 3) I confirmed that the position we are returning is the
> > next offset to fetch after the current returned records.
> > This is equivalent to the "current position" of the consumer
> > after the call to poll() that returns this ConsumerRecords
> > object
> >
> > 4) (Jason's question about whether we include metadata for
> > all partitions or just the latest fetch responses) I've
> > clarified the javadoc to state that the metadata is only
> > what was included in the latest fetches.
> >
> > Thanks,
> > -John
> >
> >
> > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > > Hi John,
> > >
> > > I've made a pass over the KIP and I think it will be a good addition.
> > >
> > > Modulo Jason's question, I'm a +1 (binding).
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
> > wrote:
> > >
> > > > Hi John,
> > > >
> > > > Just one question. It wasn't very clear to me exactly when the metadata
> > > > would be returned in `ConsumerRecords`. Would we /always/ include the
> > > > metadata for all partitions that are assigned, or would it be based on
> > the
> > > > latest fetches?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
> > wrote:
> > > >
> > > > > Thanks, Guozhang!
> > > > >
> > > > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > > > able.
> > > > >
> > > > > 3) I believe it is the position after the fetch, but I will confirm.
> > I
> > > > > think omitting position may render beginning and end offsets useless
> > as
> > > > > well, which leaves only lag. That would be fine with me, but it also
> > > > seems
> > > > > nice to supply this extra metadata since it is well defined and
> > probably
> > > > > handy for others. Therefore, I’d go the route of specifying the exact
> > > > > semantics and keeping it.
> > > > >
> > > > > Thanks for the review,
> > > > > John
> > > > >
> > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > > Hello John,
> > > > > >
> > > > > > Thanks for the updates! I've made a pass on the KIP and also the
> > POC
> > > > PR,
> > > > > > here are some minor comments:
> > > > > >
> > > > 

Re: [VOTE] voting on KIP-631: the quorum-based Kafka controller

2020-12-18 Thread Jun Rao
Thanks, Colin. +1

Jun

On Thu, Dec 17, 2020 at 2:24 AM David Jacot  wrote:

> Thanks for driving this KIP, Colin. The KIP is really well written. This is
> so exciting!
>
> +1 (binding)
>
> Best,
> David
>
> On Wed, Dec 16, 2020 at 11:51 PM Colin McCabe  wrote:
>
> > On Wed, Dec 16, 2020, at 13:08, Ismael Juma wrote:
> > > Thanks for all the work on the KIP. Given the magnitude of the KIP, I
> > > expect that some tweaks will be made as the code is implemented,
> reviewed
> > > and tested. I'm overall +1 (binding).
> > >
> >
> > Thanks, Ismael.
> >
> > > A few comments below:
> > > 1. It's a bit weird for kafka-storage to output a random uuid. Would it
> > be
> > > better to have a dedicated command for that?
> >
> > I'm not sure.  The nice thing about putting it in kafka-storage.sh is
> that
> > it's there when you need it.  I also think that having subcommands, like
> we
> > do here, really reduces the "clutter" that we have in some other
> > command-line tools.  When you get help about the "info" subcommand, you
> > don't see flags for any other subcommand, for example.  I guess we can
> move
> > this later if it seems more intuitive though.
> >
> > > Also, since we use base64
> > > encoded uuids nearly everywhere (including cluster and topic ids), it
> > would
> > > be good to follow that pattern instead of the less compact
> > > "51380268-1036-410d-a8fc-fb3b55f48033".
> >
> > Good idea.  I have updated this to use base64 encoded UUIDs.
> >
> > > 2. This is a nit, but I think it would be better to talk about built-in
> > > quorum mode instead of KIP-500 mode. It's more self descriptive than a
> > KIP
> > > reference.
> >
> > I do like the sound of "quorum mode."  I guess the main question is, if
> we
> > later implement raft quorums for regular topics, would that nomenclature
> be
> > confusing?  I guess we could talk about "metadata quorum mode" to avoid
> > confusion.  Hmm.
> >
> > > 3. Did we consider using `session` (like the group coordinator) instead
> > of
> > > `regsitration` in `broker.registration.timeout.ms`?
> >
> > Hmm, broker.session.timeout.ms does sound better.  I changed it to that.
> >
> > > 4. The flat id space for the controller and broker while requiring a
> > > different id in embedded mode seems a bit unintuitive. Are there any
> > other
> > > systems that do this? I know we covered some of the reasons in the
> > "Shared
> > > IDs between Multiple Nodes" rejected alternatives section, but it
> didn't
> > > seem totally convincing to me.
> >
> > One of my concerns here is that using separate ID spaces for controllers
> > versus brokers would potentially lead to metrics or logging collisions.
> We
> > can take a look at that again once the implementation is further along, I
> > guess, to see how often that is a problem in practice.
> >
> > > 5. With regards to the controller process listening on a separate port,
> > it
> > > may be worth adding a sentence about the forwarding KIP as that is a
> main
> > > reason why the controller port doesn't need to be accessible.
> >
> > Good idea... I added a short reference to KIP-590 in the "Networking"
> > section.
> >
> > > 6. The internal topic seems to be called @metadata. I'm personally not
> > > convinced about the usage of @ in this way. I think I would go with the
> > > same convention we have used for other existing internal topics.
> >
> > I knew this one would be controversial :)
> >
> > I guess the main argument here is that using @ avoids collisions with any
> > existing topic.  Leading underscores, even double underscores, can be
> used
> > by users to create new topics, but an "at sign" cannot  It would be nice
> to
> > have a namespace for system topics that we knew nobody else could break
> > into.
> >
> > > 7. We talk about the metadata.format feature flag. Is this intended to
> > > allow for single roll upgrades?
> > > 8. Could the incarnation id be called registration id? Or is there a
> > reason
> > > why this would be a bad name?
> >
> > I liked "incarnation id" because it expresses the idea that each new
> > incarnation of the broker gets a different one.  I think "registration
> id"
> > might be confused with "the broker id is the ID we're registering."
> >
> > > 9. Could `CurMetadataOffset` be called `CurrentMetadataOffset` for
> > > `BrokerRegistrationRequest`? The abbreviation here doesn't seem to help
> > > much and makes things slightly less readable. It would also make it
> > > consistent with `BrokerHeartbeatRequest`.
> >
> > Yeah, the abbreviated name is inconsistent.  I will change it to
> > CurrentMetadataOffset.
> >
> > > 10. Should `UnregisterBrokerRecord` be `DeregisterBrokerRecord`?
> >
> > Hmm, "Register/Unregister" is more consistent with "Fence/Unfence" which
> > is why I went with Unregister.  It looks like they're both in the
> > dictionary, so I'm not sure if "deregister" has an advantage...
> >
> > > 11. Broker metrics typically have a PerSec suffix, should we stick with
> > > that for the 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-12-18 Thread Jun Rao
Hi, Colin,

Thanks for the reply. The KIP looks good to me now. Thanks for your
diligence.

Jun

On Thu, Dec 17, 2020 at 5:15 PM Colin McCabe  wrote:

> On Thu, Dec 17, 2020, at 17:02, Jun Rao wrote:
> > Hi, Colin,
> >
> > Thanks for the reply. Sounds good. One more comment.
> >
> > 231. Currently, we have sasl.mechanism.inter.broker.protocol for inter
> > broker communication. It seems that we need a similar config for
> specifying
> > the sasl mechanism for the communication between the broker and the
> > controller.
> >
>
> Hi Jun,
>
> Yeah... sounds like we could use a new configuration key for this.  I
> added sasl.mechanism.controller.protocol for this.
>
> regards,
> Colin
>
> > Jun
> >
> > On Thu, Dec 17, 2020 at 2:29 PM Colin McCabe  wrote:
> >
> > > On Thu, Dec 17, 2020, at 10:19, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 211. Hmm, I still don't see a clear benefit of registering a broker
> > > before
> > > > recovery. It's possible for the recovery to take time. However,
> during
> > > the
> > > > recovery mode, it seems the broker will still be in the fenced mode
> and
> > > > won't be able to do work for the clients. So, registering and
> > > heartbeating
> > > > early seems to only add unnecessary overhead. For your point on topic
> > > > creation, I thought we now allow replicas to be created on
> > > > unregistered brokers.
> > > >
> > >
> > > Hi Jun,
> > >
> > > Thanks again for the reviews.
> > >
> > > Topics cannot be created on unregistered brokers.  They can be created
> on
> > > registered but fenced brokers.  So for that reason I think it makes
> sense
> > > to register as early as possible.
> > >
> > > > 230. Currently, we do have a ControllerId field in MetadataResponse.
> In
> > > the
> > > > early discussion, I thought that we want to expose the controller for
> > > > debugging purposes, but not used by the client library.
> > > >
> > >
> > > The current plan is that we will expose the controller node ID, but the
> > > controller will not be included in the list of nodes in the metadata
> > > response.
> > >
> > > It's not really possible to include the controller in that list of
> nodes
> > > because the controller may not share the same set of listeners as the
> > > broker.  So, for example, maybe the controller endpoint is using a
> > > different type of security than the broker.  So while we could pass
> back a
> > > hostname and port, the client would have no way to connect since it
> doesn't
> > > know what security settings to use.
> > >
> > > regards,
> > > Colin
> > >
> > > > Jun
> > > >
> > > > On Wed, Dec 16, 2020 at 9:13 PM Colin McCabe 
> wrote:
> > > >
> > > > > On Wed, Dec 16, 2020, at 18:13, Jun Rao wrote:
> > > > > > Hi, Colin,
> > > > > >
> > > > > > Thanks for the reply. Just a couple of more comments.
> > > > > >
> > > > > > 211. Currently, the broker only registers itself in ZK after log
> > > > > recovery.
> > > > > > Is there any benefit to change that? As you mentioned, the broker
> > > can't
> > > > > do
> > > > > > much before completing log recovery.
> > > > > >
> > > > >
> > > > > Hi Jun,
> > > > >
> > > > > Previously, it wasn't possible to register in ZK without
> immediately
> > > > > getting added to the MetadataResponse.  So I think that's the main
> > > reason
> > > > > why registration was delayed until after log recovery.  Since that
> > > > > constraint doesn't exist any more, there seems to be no reason to
> delay
> > > > > registration.
> > > > >
> > > > > I think delaying registration would have some major downsides.  If
> log
> > > > > recovery takes a while, that's a longer window during which someone
> > > else
> > > > > could register a broker with the same ID.  Having parts of the
> cluster
> > > > > missing for a while gives up some of the benefit of separating
> > > registration
> > > > > from fencing.  For example, if a broker somehow gets unregistered
> and
> > > we
> > > > > want to re-register it, but we have to wait for a 10 minute log
> > > recovery to
> > > > > do that, that could be a window during which topics can't be
> created
> > > that
> > > > > need to be on that broker, and so forth.
> > > > >
> > > > > > 230. Regarding MetadataResponse, there is a slight awkwardness.
> We
> > > return
> > > > > > rack for each node. However, if that node is for the controller,
> the
> > > rack
> > > > > > field is not really relevant. Should we clean it up here or in
> > > another
> > > > > KIP
> > > > > > like KIP-700?
> > > > >
> > > > > Oh, controllers don't appear in the MetadataResponses returned to
> > > clients,
> > > > > since clients can't access them.  I should have been more clear
> about
> > > that
> > > > > in the KIP-- I added a sentence to "Networking" describing this.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Dec 16, 2020 at 4:23 PM Colin McCabe  >
> > > wrote:
> > > > > >

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread Ismael Juma
Hi John,

It would be good to make sure these changes have no measurable performance
impact for the use cases that don't need it. Have we given this some
thought? And what would be the perf testing strategy to verify this?

Ismael

On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:

> Thanks for the votes and reviews, all. I'll wait for a
> response from Jason before closing the vote, since he asked
> for clarification.
>
> The present count is:
> * 3 binding +1 (Guozhang, Bill, and myself)
> * 2 non-binding +1 (Bruno and Walker)
>
> I have updated the KIP document in response to the requests
> for clarification:
> 1) The new metadata() map actually just contains immutable
> Metadata objects representing the metadata received in the
> last round of fetch responses, so I decided to stick with
> `receivedMetadata`, as that is an accurate representation of
> the timestamp's meaning.
>
> 2) I added a javadoc clarifying that the metadata partitions
> may be a superset of the data partitions in the same
> ConsumerRecords
>
> 3) I confirmed that the position we are returning is the
> next offset to fetch after the current returned records.
> This is equivalent to the "current position" of the consumer
> after the call to poll() that returns this ConsumerRecords
> object
>
> 4) (Jason's question about whether we include metadata for
> all partitions or just the latest fetch responses) I've
> clarified the javadoc to state that the metadata is only
> what was included in the latest fetches.
>
> Thanks,
> -John
>
>
> On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > Hi John,
> >
> > I've made a pass over the KIP and I think it will be a good addition.
> >
> > Modulo Jason's question, I'm a +1 (binding).
> >
> > Thanks,
> > Bill
> >
> > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
> wrote:
> >
> > > Hi John,
> > >
> > > Just one question. It wasn't very clear to me exactly when the metadata
> > > would be returned in `ConsumerRecords`. Would we /always/ include the
> > > metadata for all partitions that are assigned, or would it be based on
> the
> > > latest fetches?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
> wrote:
> > >
> > > > Thanks, Guozhang!
> > > >
> > > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > > able.
> > > >
> > > > 3) I believe it is the position after the fetch, but I will confirm.
> I
> > > > think omitting position may render beginning and end offsets useless
> as
> > > > well, which leaves only lag. That would be fine with me, but it also
> > > seems
> > > > nice to supply this extra metadata since it is well defined and
> probably
> > > > handy for others. Therefore, I’d go the route of specifying the exact
> > > > semantics and keeping it.
> > > >
> > > > Thanks for the review,
> > > > John
> > > >
> > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > Hello John,
> > > > >
> > > > > Thanks for the updates! I've made a pass on the KIP and also the
> POC
> > > PR,
> > > > > here are some minor comments:
> > > > >
> > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > > updated,
> > > > > and we do not create a new object but just update the values
> in-place,
> > > so
> > > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > >
> > > > > 2) It will be great to verify in javadocs that the new API
> > > > > "ConsumerRecords#metadata(): Map" may
> return
> > > a
> > > > > superset of TopicPartitions than the existing API that returns the
> data
> > > > by
> > > > > partitions, in case users assume their map key-entries would
> always be
> > > > the
> > > > > same.
> > > > >
> > > > > 3) The "position()" API of the call needs better clarification: is
> it
> > > the
> > > > > current position AFTER the records are returned, or is it BEFORE
> the
> > > > > records are returned? Personally I'd suggest we do not include it
> if it
> > > > is
> > > > > not used anywhere yet just to avoid possible misuage, but I'm fine
> if
> > > you
> > > > > like to keep it still; in that case just clarify its semantics.
> > > > >
> > > > >
> > > > > Other than that,I'm +1 on the KIP as well !
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <
> wcarl...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > walker
> > > > > >
> > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <
> br...@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP, John!
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Best,
> > > > > > > Bruno
> > > > > > >
> > > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > > > like to go ahead and call for a vote.
> > > > > > > >
> > > > > 

[jira] [Created] (KAFKA-10867) Implement improved semantics using the ConsumerRecords meta

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10867:


 Summary: Implement improved semantics using the ConsumerRecords 
meta
 Key: KAFKA-10867
 URL: https://issues.apache.org/jira/browse/KAFKA-10867
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10866:


 Summary: Add fetched metadata to ConsumerRecords
 Key: KAFKA-10866
 URL: https://issues.apache.org/jira/browse/KAFKA-10866
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0


Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread John Roesler
Thanks for the votes and reviews, all. I'll wait for a
response from Jason before closing the vote, since he asked
for clarification.

The present count is:
* 3 binding +1 (Guozhang, Bill, and myself)
* 2 non-binding +1 (Bruno and Walker)

I have updated the KIP document in response to the requests
for clarification:
1) The new metadata() map actually just contains immutable
Metadata objects representing the metadata received in the
last round of fetch responses, so I decided to stick with
`receivedMetadata`, as that is an accurate representation of
the timestamp's meaning.

2) I added a javadoc clarifying that the metadata partitions
may be a superset of the data partitions in the same
ConsumerRecords

3) I confirmed that the position we are returning is the
next offset to fetch after the current returned records.
This is equivalent to the "current position" of the consumer
after the call to poll() that returns this ConsumerRecords
object

4) (Jason's question about whether we include metadata for
all partitions or just the latest fetch responses) I've
clarified the javadoc to state that the metadata is only
what was included in the latest fetches.

Thanks,
-John


On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> Hi John,
> 
> I've made a pass over the KIP and I think it will be a good addition.
> 
> Modulo Jason's question, I'm a +1 (binding).
> 
> Thanks,
> Bill
> 
> On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson  wrote:
> 
> > Hi John,
> > 
> > Just one question. It wasn't very clear to me exactly when the metadata
> > would be returned in `ConsumerRecords`. Would we /always/ include the
> > metadata for all partitions that are assigned, or would it be based on the
> > latest fetches?
> > 
> > Thanks,
> > Jason
> > 
> > On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
> > 
> > > Thanks, Guozhang!
> > > 
> > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > able.
> > > 
> > > 3) I believe it is the position after the fetch, but I will confirm. I
> > > think omitting position may render beginning and end offsets useless as
> > > well, which leaves only lag. That would be fine with me, but it also
> > seems
> > > nice to supply this extra metadata since it is well defined and probably
> > > handy for others. Therefore, I’d go the route of specifying the exact
> > > semantics and keeping it.
> > > 
> > > Thanks for the review,
> > > John
> > > 
> > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > Hello John,
> > > > 
> > > > Thanks for the updates! I've made a pass on the KIP and also the POC
> > PR,
> > > > here are some minor comments:
> > > > 
> > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > updated,
> > > > and we do not create a new object but just update the values in-place,
> > so
> > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > 
> > > > 2) It will be great to verify in javadocs that the new API
> > > > "ConsumerRecords#metadata(): Map" may return
> > a
> > > > superset of TopicPartitions than the existing API that returns the data
> > > by
> > > > partitions, in case users assume their map key-entries would always be
> > > the
> > > > same.
> > > > 
> > > > 3) The "position()" API of the call needs better clarification: is it
> > the
> > > > current position AFTER the records are returned, or is it BEFORE the
> > > > records are returned? Personally I'd suggest we do not include it if it
> > > is
> > > > not used anywhere yet just to avoid possible misuage, but I'm fine if
> > you
> > > > like to keep it still; in that case just clarify its semantics.
> > > > 
> > > > 
> > > > Other than that,I'm +1 on the KIP as well !
> > > > 
> > > > 
> > > > Guozhang
> > > > 
> > > > 
> > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> > > > wrote:
> > > > 
> > > > > Thanks for the KIP!
> > > > > 
> > > > > +1 (non-binding)
> > > > > 
> > > > > walker
> > > > > 
> > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna 
> > > wrote:
> > > > > 
> > > > > > Thanks for the KIP, John!
> > > > > > 
> > > > > > +1 (non-binding)
> > > > > > 
> > > > > > Best,
> > > > > > Bruno
> > > > > > 
> > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > Hello all,
> > > > > > > 
> > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > > like to go ahead and call for a vote.
> > > > > > > 
> > > > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > > > will allow Streams to offer deterministic time semantics in
> > > > > > > join-type topologies. For example, it makes sure that
> > > > > > > when you join two topics, that we collate the topics by
> > > > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > > > 353), but it turns out the previous mechanism couldn't
> > > > > > > provide the desired semantics.
> > > > > > > 
> > > > > > > The details are here:
> > > > > > > 

Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread Gwen Shapira
Agree. Once we have the basic API we can decide what belongs and what
doesn't. I remember requests for broker status, version, and various
other things. Considering all those options together will be too much
at this point.

One thing that may be worth considering is whether you want to include
not just the controller ID but also its epoch. This will allow
resolving cases where brokers respond with outdated information. I
haven't thought it through, so maybe it doesn't make sense here, but I
was wondering if this was considered.

On Fri, Dec 18, 2020 at 4:51 AM David Jacot  wrote:
>
> Hi Ryan,
>
> Thanks for your feedback. That's an interesting idea but that raises more
> questions. At the moment, `describeCluster` only requires to be
> authenticated
> (if authentication is enabled) to get the basic information about the
> cluster.
> If we add the JMX port, is it something that we would provide without
> requiring
> more permissions? I am not sure about this.
>
> I lean towards keeping this KIP focused on introducing the new API and to
> add new information with separate KIPs. There might be more information
> that we want to add as part of KIP-500.
>
> I will be happy to hear what other members of the community think about
> this.
>
> Best,
> David
>
> On Thu, Dec 17, 2020 at 5:57 AM Colin McCabe  wrote:
>
> > Hi David,
> >
> > This seems reasonable.  It would be nice to have an API specifically for
> > describeCluster, so that we could extend this API without adding more
> > fields to the already large MetadataRequest.
> >
> > As you mention in the KIP, KIP-700 would allow us to deprecate
> > MetadataRequest#ClusterAuthorizedOperations.  So it seems like this KIP
> > should specify a new version of MetadataRequest where the related fields
> > are absent, right?
> >
> > best,
> > Colin
> >
> >
> > On Mon, Dec 14, 2020, at 08:10, David Jacot wrote:
> > > Hi all,
> > >
> > > I'd like to propose a small KIP:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
> > >
> > > Please let me know what you think.
> > >
> > > Best,
> > > David
> > >
> >



-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


[jira] [Resolved] (KAFKA-10100) LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-12-18 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10100.
-
Resolution: Won't Fix

This is not worthwhile. 

> LiveLeaders field in LeaderAndIsrRequest is not used anymore
> 
>
> Key: KAFKA-10100
> URL: https://issues.apache.org/jira/browse/KAFKA-10100
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is 
> not used anywhere but still populated by the controller.
> It seems that that field was introduced in AK `0.8.0` and was supposed to be 
> removed in AK `0.8.1`: 
> [https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.]
> I think that we can safely deprecate the field and stop populating it for all 
> versions > 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Request to be added as a contributor in Kafka JIRA board

2020-12-18 Thread Prathibha Muralidharan
Hi,

Please add me as a contributor in Kafka JIRA board and Confluence Wiki. My
JIRA Id is Prathibha. My Confluence ID is prathibha.

Thanks,

Prathibha

-
Prathibha Muralidharan
SWE @ Connect Team
Confluent
Mountain View, CA
-


[jira] [Created] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Robin Moffatt (Jira)
Robin Moffatt created KAFKA-10865:
-

 Summary: Improve trace-logging for Transformations (including 
Predicates)
 Key: KAFKA-10865
 URL: https://issues.apache.org/jira/browse/KAFKA-10865
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Robin Moffatt


I've been spending [a bunch of time poking around 
SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
challenge I've had is being able to debug when things don't behave as I expect.
  
 I know that there is the {{TransformationChain}} logger, but this only gives 
(IIUC) the input record
{code:java}
[2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
transformation io.confluent.connect.transforms.Filter$Value to 
SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
value=Struct{units=16,product=Founders Breakfast 
Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
headers=ConnectHeaders(headers=)} 
(org.apache.kafka.connect.runtime.TransformationChain:47)
{code}
 
 I think it would be really useful to also have trace level logging that 
included:
 - the _output_ of *each* transform
 - the evaluation and result of any `predicate`s


I have been using 
{{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
which is really useful for seeing the final record:
{code:java}
[2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
record.value=Struct{units=16,product=Founders Breakfast 
Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
(com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
{code}
 
 But doesn't include things like topic name (which is often changed by common 
SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread David Jacot
Hi Colin,

Thanks for your feedback.

That is correct. I mention this in the future work section. I thought that
we
could remove it when we could also remove TopicAuthorizedOperations
in order to remain consistent in the API. We can also do it immediately.
I don't have a strong preference for any ways. What do you think?

Best,
David

On Fri, Dec 18, 2020 at 1:51 PM David Jacot  wrote:

> Hi Ryan,
>
> Thanks for your feedback. That's an interesting idea but that raises more
> questions. At the moment, `describeCluster` only requires to be
> authenticated
> (if authentication is enabled) to get the basic information about the
> cluster.
> If we add the JMX port, is it something that we would provide without
> requiring
> more permissions? I am not sure about this.
>
> I lean towards keeping this KIP focused on introducing the new API and to
> add new information with separate KIPs. There might be more information
> that we want to add as part of KIP-500.
>
> I will be happy to hear what other members of the community think about
> this.
>
> Best,
> David
>
> On Thu, Dec 17, 2020 at 5:57 AM Colin McCabe  wrote:
>
>> Hi David,
>>
>> This seems reasonable.  It would be nice to have an API specifically for
>> describeCluster, so that we could extend this API without adding more
>> fields to the already large MetadataRequest.
>>
>> As you mention in the KIP, KIP-700 would allow us to deprecate
>> MetadataRequest#ClusterAuthorizedOperations.  So it seems like this KIP
>> should specify a new version of MetadataRequest where the related fields
>> are absent, right?
>>
>> best,
>> Colin
>>
>>
>> On Mon, Dec 14, 2020, at 08:10, David Jacot wrote:
>> > Hi all,
>> >
>> > I'd like to propose a small KIP:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
>> >
>> > Please let me know what you think.
>> >
>> > Best,
>> > David
>> >
>>
>


Re: [DISCUSS] KIP-700: Add Describe Cluster API

2020-12-18 Thread David Jacot
Hi Ryan,

Thanks for your feedback. That's an interesting idea but that raises more
questions. At the moment, `describeCluster` only requires to be
authenticated
(if authentication is enabled) to get the basic information about the
cluster.
If we add the JMX port, is it something that we would provide without
requiring
more permissions? I am not sure about this.

I lean towards keeping this KIP focused on introducing the new API and to
add new information with separate KIPs. There might be more information
that we want to add as part of KIP-500.

I will be happy to hear what other members of the community think about
this.

Best,
David

On Thu, Dec 17, 2020 at 5:57 AM Colin McCabe  wrote:

> Hi David,
>
> This seems reasonable.  It would be nice to have an API specifically for
> describeCluster, so that we could extend this API without adding more
> fields to the already large MetadataRequest.
>
> As you mention in the KIP, KIP-700 would allow us to deprecate
> MetadataRequest#ClusterAuthorizedOperations.  So it seems like this KIP
> should specify a new version of MetadataRequest where the related fields
> are absent, right?
>
> best,
> Colin
>
>
> On Mon, Dec 14, 2020, at 08:10, David Jacot wrote:
> > Hi all,
> >
> > I'd like to propose a small KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
> >
> > Please let me know what you think.
> >
> > Best,
> > David
> >
>


Re: Improving crash-restart catchup speed

2020-12-18 Thread Gokul Ramanan Subramanian
Thanks. Makes sense.

On Wed, Dec 16, 2020 at 2:02 AM Haruki Okada  wrote:

> Hi.
> One possible solution I can imagine is to use replication throttle.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas
>
> You can set lower replication quota for other brokers -> failed broker
> traffic during catching-up to prevent exhausting storage throughput.
>
> 2020年12月15日(火) 20:56 Gokul Ramanan Subramanian :
>
> > Hi.
> >
> >
> > When a broker crashes and restarts after a long time (say 30 minutes),
> > the broker has to do some work to sync all its replicas with the
> > leaders. If the broker is a preferred replica for some partition, the
> > broker may become the leader as part of a preferred replica leader
> > election, while it is still catching up on some other partitions.
> >
> >
> > This scenario can lead to a high incoming throughput on the broker
> > during the catch up phase and cause back pressure with certain storage
> > volumes (which have a fixed max throughput). This backpressure can
> > slow down recovery time, and manifest in the form of client
> > application errors in producing / consuming data on / from the
> > recovering broker.
> >
> >
> > I am looking for solutions to mitigate this problem. There are two
> > solutions that I am aware of.
> >
> >
> > 1. Scale out the cluster to have more brokers, so that the replication
> > traffic is smaller per broker during recovery.
> >
> >
> > 2. Keep preferred replica leader elections disabled and manually run
> > one instance of preferred replica leader election after the broker has
> > come back up and fully caught up.
> >
> >
> > Are there other solutions?
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #325

2020-12-18 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed 
(#9735)


--
[...truncated 6.99 MB...]

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@4e7b8cd6, 
timestamped = false, caching = true, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@4e7b8cd6, 
timestamped = false, caching = true, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@20a8c60e, 
timestamped = false, caching = false, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@20a8c60e, 
timestamped = false, caching = false, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@1129f1a4, 
timestamped = false, caching = false, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@1129f1a4, 
timestamped = false, caching = false, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@9c370fe, 
timestamped = false, caching = false, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@9c370fe, 
timestamped = false, caching = false, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@1bcc502c, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@1bcc502c, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@7d327ded, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@7d327ded, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@23c8073e, 
timestamped = false, caching = false, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.WindowStoreBuilder@23c8073e, 
timestamped = false, caching = false, logging = false] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@319ec739, 
timestamped = false, caching = true, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@319ec739, 
timestamped = false, caching = true, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@48aabc16, 
timestamped = false, caching = true, logging = true] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@48aabc16, 
timestamped = false, caching = true, logging = true] PASSED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@26997dfb, 
timestamped = false, caching = true, logging = false] STARTED

org.apache.kafka.streams.test.MockProcessorContextStateStoreTest > 
shouldEitherInitOrThrow[builder = 
org.apache.kafka.streams.state.internals.SessionStoreBuilder@26997dfb, 
timestamped = false, caching = true, logging = false]