[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676728#comment-16676728 ] Damian Guy commented on KAFKA-7595: --- [~vvcephei] your reasoning seems valid to me > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380293#comment-16380293 ] Damian Guy commented on KAFKA-6555: --- I think if we are going to allow stale reads from restoring tasks then it probably should be configurable as this may not always be desirable. Imagine a situation where there is a lot of data to restore, you could be reading different incorrect values for quite a while. Sure there may be situations where you don't care, but I'm sure there are situations when you do care. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation
[ https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6577. --- Resolution: Fixed Fix Version/s: 1.2.0 Issue resolved by pull request 4610 [https://github.com/apache/kafka/pull/4610] > Connect standalone SASL file source and sink test fails without explanation > --- > > Key: KAFKA-6577 > URL: https://issues.apache.org/jira/browse/KAFKA-6577 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 1.2.0, 1.1.0 > > > The > {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}} > test is failing with the SASL configuration without a sufficient > explanation. During the test, the Connect worker fails to start, but the > Connect log contains no useful information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5123) Refactor ZkUtils readData* methods
[ https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5123: -- Fix Version/s: (was: 1.1.0) > Refactor ZkUtils readData* methods > --- > > Key: KAFKA-5123 > URL: https://issues.apache.org/jira/browse/KAFKA-5123 > Project: Kafka > Issue Type: Bug >Reporter: Balint Molnar >Assignee: Balint Molnar >Priority: Minor > > Usually only the data value is required but every readData method in the > ZkUtils returns a Tuple with the data and the stat. > https://github.com/apache/kafka/pull/2888 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5313) Improve exception handling on coordinator interactions
[ https://issues.apache.org/jira/browse/KAFKA-5313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5313: -- Fix Version/s: (was: 1.1.0) > Improve exception handling on coordinator interactions > -- > > Key: KAFKA-5313 > URL: https://issues.apache.org/jira/browse/KAFKA-5313 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Eno Thereska >Assignee: Matthias J. Sax >Priority: Major > > Exceptions during assignment of tasks are caught in ConsumerCoordinator.java > and streams becomes aware of them during the > StreamThread.onPartitionsAssigned() and StreamThread.onPartitionsRevoked() > methods. Eventually these exceptions go through StreamThread.pollRequests() > all the way up to StreamThread.runLoop() and will halt the stream thread that > is processing these exceptions. Other stream threads may continue processing, > however it is likely they will experience problems too soon after. > Exceptions here include LockExceptions that are thrown if tasks cannot use a > particular directory due to previous tasks not releasing locks on them during > reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6136) Transient test failure: SaslPlainSslEndToEndAuthorizationTest.testTwoConsumersWithDifferentSaslCredentials
[ https://issues.apache.org/jira/browse/KAFKA-6136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6136: -- Fix Version/s: (was: 1.1.0) > Transient test failure: > SaslPlainSslEndToEndAuthorizationTest.testTwoConsumersWithDifferentSaslCredentials > -- > > Key: KAFKA-6136 > URL: https://issues.apache.org/jira/browse/KAFKA-6136 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > Labels: transient-unit-test-failure > > Looks like a cleanup issue: > {code} > testTwoConsumersWithDifferentSaslCredentials – > kafka.api.SaslPlainSslEndToEndAuthorizationTest > a few seconds > Error > org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to > access group: group > Stacktrace > org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to > access group: group > Standard Output > [2017-10-27 00:37:47,919] ERROR ZKShutdownHandler is not registered, so > ZooKeeper server won't take any action on ERROR or SHUTDOWN server state > changes (org.apache.zookeeper.server.ZooKeeperServer:472) > Adding ACLs for resource `Cluster:kafka-cluster`: > User:admin has Allow permission for operations: ClusterAction from > hosts: * > Current ACLs for resource `Cluster:kafka-cluster`: > User:admin has Allow permission for operations: ClusterAction from > hosts: * > [2017-10-27 00:37:48,961] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition __consumer_offsets-0 from broker 0 > (kafka.server.ReplicaFetcherThread:107) > org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to > access topics: [Topic authorization failed.] > [2017-10-27 00:37:48,967] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition __consumer_offsets-0 from broker 0 > (kafka.server.ReplicaFetcherThread:107) > org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to > access topics: [Topic authorization failed.] > Adding ACLs for resource `Topic:*`: > User:admin has Allow permission for operations: Read from hosts: * > Current ACLs for resource `Topic:*`: > User:admin has Allow permission for operations: Read from hosts: * > [2017-10-27 00:37:52,330] ERROR ZKShutdownHandler is not registered, so > ZooKeeper server won't take any action on ERROR or SHUTDOWN server state > changes (org.apache.zookeeper.server.ZooKeeperServer:472) > [2017-10-27 00:37:52,345] ERROR ZKShutdownHandler is not registered, so > ZooKeeper server won't take any action on ERROR or SHUTDOWN server state > changes (org.apache.zookeeper.server.ZooKeeperServer:472) > Adding ACLs for resource `Cluster:kafka-cluster`: > User:admin has Allow permission for operations: ClusterAction from > hosts: * > Current ACLs for resource `Cluster:kafka-cluster`: > User:admin has Allow permission for operations: ClusterAction from > hosts: * > [2017-10-27 00:37:53,459] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition __consumer_offsets-0 from broker 0 > (kafka.server.ReplicaFetcherThread:107) > org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to > access topics: [Topic authorization failed.] > [2017-10-27 00:37:53,462] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition __consumer_offsets-0 from broker 0 > (kafka.server.ReplicaFetcherThread:107) > org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to > access topics: [Topic authorization failed.] > Adding ACLs for resource `Topic:*`: > User:admin has Allow permission for operations: Read from hosts: * > Current ACLs for resource `Topic:*`: > User:admin has Allow permission for operations: Read from hosts: * > Adding ACLs for resource `Topic:e2etopic`: > User:user has Allow permission for operations: Write from hosts: * > User:user has Allow permission for operations: Describe from hosts: * > Adding ACLs for resource `Cluster:kafka-cluster`: > User:user has Allow permission for operations: Create from hosts: * > Current ACLs for resource `Topic:e2etopic`: > User:user has Allow permission for operations: Write from hosts: * > User:user has Allow permission for operations: Describe from hosts: * > Adding ACLs for resource `Topic:e2etopic`: > User:user has Allow permission for operations: Read from hosts: * > User:user has Allow permission for operations: Describe from hosts: * > Adding ACLs for resource `Group:group`: > User:user has Allow permission for operations: Read from hosts: * > Current ACLs for resource `Topic:e2etopic`: > User:user has Allow permission for
[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4879: -- Fix Version/s: (was: 1.1.0) 2.0.0 > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357015#comment-16357015 ] Damian Guy commented on KAFKA-6529: --- [~ijuma] should this go in to 1.1? > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > Fix For: 1.1.0, 0.11.0.3, 1.0.2 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5919) Adding checks on "version" field for tools using it
[ https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5919: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Adding checks on "version" field for tools using it > --- > > Key: KAFKA-5919 > URL: https://issues.apache.org/jira/browse/KAFKA-5919 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 2.0.0 > > > Hi, > the kafka-delete-records script allows user to pass information about records > to delete through a JSON file. Such file, as described in the command help, > is made by a "partitions" array and a "version" field. Reading > [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] > and the DeleteRecords API (Key: 21) description it's not clear what such > field is and even it's not used at all (in the current implementation). > It turned out that the field is for having backward compatibility in the > future where the JSON format could change. This JIRA is about adding more > checks on the "version" field having it not mandatory but assuming the > earliest version (current 1) if it's omitted from the JSON file. > The same for the kafka-reassign-partitions which has a topics-to-move JSON > file as input (used with --generate option) and the partitions-to-move.json > (used with --execute option). In both cases the same logic can be applied as > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6448) Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the annotation
[ https://issues.apache.org/jira/browse/KAFKA-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6448: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the > annotation > --- > > Key: KAFKA-6448 > URL: https://issues.apache.org/jira/browse/KAFKA-6448 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Hongyuan Li >Priority: Minor > Fix For: 2.0.0 > > Attachments: KAFKA-6448-1.patch, KAFKA-6448-2.patch > > > In the annotation, it said > {code}*This feature must be enabled with -Dmx4jenable=true*{code} > *which is not compatible with the code* > {code} > ** > props.getBoolean("kafka_mx4jenable", false) > ** > {code} > patch KAFKA-6448-1.patch modifies the code, and KAFKA-6448-2.patch modifies > the annotation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5532) Making bootstrap.servers property a first citizen option for the ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-5532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5532: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Making bootstrap.servers property a first citizen option for the > ProducerPerformance > > > Key: KAFKA-5532 > URL: https://issues.apache.org/jira/browse/KAFKA-5532 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Trivial > Fix For: 2.0.0 > > > Hi, > using the ProducerPerformance tool you have to specify the bootstrap.servers > option using the producer-props or producer-config option. It could be better > having bootstrap.servers as a first citizen option like all the other tools, > so a dedicate --bootstrap-servers option. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5517) Support linking to particular configuration parameters
[ https://issues.apache.org/jira/browse/KAFKA-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5517: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Support linking to particular configuration parameters > -- > > Key: KAFKA-5517 > URL: https://issues.apache.org/jira/browse/KAFKA-5517 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > Currently the configuration parameters are documented long tables, and it's > only possible to link to the heading before a particular table. When > discussing configuration parameters on forums it would be helpful to be able > to link to the particular parameter under discussion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5479) Docs for authorization omit authorizer.class.name
[ https://issues.apache.org/jira/browse/KAFKA-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5479: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Docs for authorization omit authorizer.class.name > - > > Key: KAFKA-5479 > URL: https://issues.apache.org/jira/browse/KAFKA-5479 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > The documentation in §7.4 Authorization and ACLs doesn't mention the > {{authorizer.class.name}} setting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5692: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Refactor PreferredReplicaLeaderElectionCommand to use AdminClient > - > > Key: KAFKA-5692 > URL: https://issues.apache.org/jira/browse/KAFKA-5692 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: kip, patch-available > Fix For: 2.0.0 > > > The PreferredReplicaLeaderElectionCommand currently uses a direct connection > to zookeeper. The zookeeper dependency should be deprecated and an > AdminClient API created to be used instead. > This change will require a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace
[ https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5359: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Exceptions from RequestFuture lack parts of the stack trace > --- > > Key: KAFKA-5359 > URL: https://issues.apache.org/jira/browse/KAFKA-5359 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Magnus Reftel >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > When an exception occurs within a task that reports its result using a > RequestFuture, that exception is stored in a field on the RequestFuture using > the {{raise}} method. In many places in the code where such futures are > completed, that exception is then thrown directly using {{throw > future.exception();}} (see e.g. > [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]). > This means that the exception that ends up in client code only has stack > traces related to the original exception, but nothing leading up to the > completion of the future. The client therefore gets no indication of what was > going on in the client code - only that it somehow ended up in the Kafka > libraries, and that a task failed at some point. > One solution to this is to use the exceptions from the future as causes for > chained exceptions, so that the client gets a stack trace that shows what the > client was doing, in addition to getting the stack traces for the exception > in the task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4950: -- Fix Version/s: (was: 1.1.0) 2.0.0 > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Sébastien Launay >Priority: Minor > Fix For: 2.0.0, 1.0.2 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4794: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > Fix For: 2.0.0 > > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4126) No relevant log when the topic is non-existent
[ https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4126: -- Fix Version/s: (was: 1.1.0) 2.0.0 > No relevant log when the topic is non-existent > -- > > Key: KAFKA-4126 > URL: https://issues.apache.org/jira/browse/KAFKA-4126 > Project: Kafka > Issue Type: Bug >Reporter: Balázs Barnabás >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > When a producer sends a ProducerRecord into a Kafka topic that doesn't > existst, there is no relevant debug/error log that points out the error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4893: -- Fix Version/s: (was: 1.1.0) 2.0.0 > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still exists but has Leader set to -1 and the > controller recognizes the topic completion as incomplete (the topic znode is > still in /admin/delete_topics). > I don't believe linkedin has any topic name this long but I'm making the > ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3999) Consumer bytes-fetched metric uses decompressed message size
[ https://issues.apache.org/jira/browse/KAFKA-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3999: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Consumer bytes-fetched metric uses decompressed message size > > > Key: KAFKA-3999 > URL: https://issues.apache.org/jira/browse/KAFKA-3999 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > It looks like the computation for the bytes-fetched metrics uses the size of > the decompressed message set. I would have expected it to be based off of the > raw size of the fetch responses. Perhaps it would be helpful to expose both > the raw and decompressed fetch sizes? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4931) stop script fails due 4096 ps output limit
[ https://issues.apache.org/jira/browse/KAFKA-4931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4931: -- Fix Version/s: (was: 1.1.0) 2.0.0 > stop script fails due 4096 ps output limit > -- > > Key: KAFKA-4931 > URL: https://issues.apache.org/jira/browse/KAFKA-4931 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0 >Reporter: Amit Jain >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > When run the script: bin/zookeeper-server-stop.sh fails to stop the zookeeper > server process if the ps output exceeds 4096 character limit of linux. I > think instead of ps we can use ${JAVA_HOME}/bin/jps -vl | grep QuorumPeerMain > it would correctly stop zookeeper process. Currently we are using kill > PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk > '{print $1}') -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3733) Avoid long command lines by setting CLASSPATH in environment
[ https://issues.apache.org/jira/browse/KAFKA-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3733: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Avoid long command lines by setting CLASSPATH in environment > > > Key: KAFKA-3733 > URL: https://issues.apache.org/jira/browse/KAFKA-3733 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Adrian Muraru >Assignee: Adrian Muraru >Priority: Minor > Fix For: 2.0.0 > > > {{kafka-run-class.sh}} sets the JVM classpath in the command line via {{-cp}}. > This generates long command lines that gets trimmed by the shell in commands > like ps, pgrep,etc. > An alternative is to set the CLASSPATH in environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3575) Use console consumer access topic that does not exist, can not use "Control + C" to exit process
[ https://issues.apache.org/jira/browse/KAFKA-3575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3575: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Use console consumer access topic that does not exist, can not use "Control + > C" to exit process > > > Key: KAFKA-3575 > URL: https://issues.apache.org/jira/browse/KAFKA-3575 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: SUSE Linux Enterprise Server 11 SP3 >Reporter: NieWang >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > 1. use "sh kafka-console-consumer.sh --zookeeper 10.252.23.133:2181 --topic > topic_02" start console consumer. topic_02 does not exist. > 2. you can not use "Control + C" to exit console consumer process. The > process is blocked. > 3. use jstack check process stack, as follows: > linux:~ # jstack 122967 > 2016-04-18 15:46:06 > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode): > "Attach Listener" #29 daemon prio=9 os_prio=0 tid=0x01781800 > nid=0x1e0c8 waiting on condition [0x] >java.lang.Thread.State: RUNNABLE > "Thread-4" #27 prio=5 os_prio=0 tid=0x018a4000 nid=0x1e08a waiting on > condition [0x7ffbe5ac] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe00ed3b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101) > "SIGINT handler" #28 daemon prio=9 os_prio=0 tid=0x019d5800 > nid=0x1e089 in Object.wait() [0x7ffbe5bc1000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.$$YJP$$wait(Native Method) > at java.lang.Object.wait(Object.java) > at java.lang.Thread.join(Thread.java:1245) > - locked <0xe71fd4e8> (a kafka.tools.ConsoleConsumer$$anon$1) > at java.lang.Thread.join(Thread.java:1319) > at > java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) > at > java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(Shutdown.java:123) > at java.lang.Shutdown.sequence(Shutdown.java:167) > at java.lang.Shutdown.exit(Shutdown.java:212) > - locked <0xe00abfd8> (a java.lang.Class for > java.lang.Shutdown) > at java.lang.Terminator$1.handle(Terminator.java:52) > at sun.misc.Signal$1.run(Signal.java:212) > at java.lang.Thread.run(Thread.java:745) > "metrics-meter-tick-thread-2" #20 daemon prio=5 os_prio=0 > tid=0x7ffbec77a800 nid=0x1e079 waiting on condition [0x7ffbe66c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe6fa6438> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "metrics-meter-tick-thread-1" #19 daemon prio=5 os_prio=0 > tid=0x7ffbec783000 nid=0x1e078 waiting on condition [0x7ffbe67c9000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe6fa6438> (a >
[jira] [Updated] (KAFKA-5914) Return MessageFormatVersion and MessageMaxBytes in MetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-5914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5914: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Return MessageFormatVersion and MessageMaxBytes in MetadataResponse > --- > > Key: KAFKA-5914 > URL: https://issues.apache.org/jira/browse/KAFKA-5914 > Project: Kafka > Issue Type: Sub-task >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 2.0.0 > > > As part of KIP-192, we want to send two additional fields in the > {{TopicMetadata}} which is part of the {{MetadataResponse}}. These fields are > the {{MessageFormatVersion}} and the {{MessageMaxBytes}}. > The {{MessageFormatVersion}} is required to implement > https://issues.apache.org/jira/browse/KAFKA-5794 . The latter will be > implemented in a future release, but with the changes proposed here, the said > future release will be backward compatible with 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics
[ https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5952: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Refactor Consumer Fetcher metrics > - > > Key: KAFKA-5952 > URL: https://issues.apache.org/jira/browse/KAFKA-5952 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics
[ https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5951: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Autogenerate Producer RecordAccumulator metrics > --- > > Key: KAFKA-5951 > URL: https://issues.apache.org/jira/browse/KAFKA-5951 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6390) Update ZooKeeper to 3.4.11, Gradle and other minor updates
[ https://issues.apache.org/jira/browse/KAFKA-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6390: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Update ZooKeeper to 3.4.11, Gradle and other minor updates > -- > > Key: KAFKA-6390 > URL: https://issues.apache.org/jira/browse/KAFKA-6390 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2614 is a helpful fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5944) Add unit tests for handling of authentication failures in clients
[ https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5944: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Add unit tests for handling of authentication failures in clients > - > > Key: KAFKA-5944 > URL: https://issues.apache.org/jira/browse/KAFKA-5944 > Project: Kafka > Issue Type: Test > Components: clients >Reporter: Rajini Sivaram >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > KAFKA-5854 improves authentication failures in clients and has added > integration tests and some basic client-side tests that create actual > connections to a mock server. It will be good to add a set of tests for > producers, consumers etc. that use MockClient to add more extensive tests for > various scenarios. > cc [~hachikuji] [~vahid] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5886: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe >Priority: Major > Fix For: 2.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5637) Document compatibility and release policies
[ https://issues.apache.org/jira/browse/KAFKA-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5637: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document compatibility and release policies > --- > > Key: KAFKA-5637 > URL: https://issues.apache.org/jira/browse/KAFKA-5637 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Ismael Juma >Assignee: Sönke Liebau >Priority: Major > Fix For: 2.0.0 > > > We should document our compatibility and release policies in one place so > that people have the correct expectations. This is generally important, but > more so now that we are releasing 1.0.0. > I extracted the following topics from the mailing list thread as the ones > that should be documented as a minimum: > *Code stability* > * Explanation of stability annotations and their implications > * Explanation of what public apis are > * *Discussion point: * Do we want to keep the _unstable_ annotation or is > _evolving_ sufficient going forward? > *Support duration* > * How long are versions supported? > * How far are bugfixes backported? > * How far are security fixes backported? > * How long are protocol versions supported by subsequent code versions? > * How long are older clients supported? > * How long are older brokers supported? > I will create an initial pull request to add a section to the documentation > as basis for further discussion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods
[ https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5445: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document exceptions thrown by AdminClient methods > - > > Key: KAFKA-5445 > URL: https://issues.apache.org/jira/browse/KAFKA-5445 > Project: Kafka > Issue Type: Improvement > Components: admin, clients >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Fix For: 2.0.0, 1.0.2 > > > AdminClient should document the exceptions that users may have to handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset
[ https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5403: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Transactions system test should dedup consumed messages by offset > - > > Key: KAFKA-5403 > URL: https://issues.apache.org/jira/browse/KAFKA-5403 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 2.0.0 > > > In KAFKA-5396, we saw that the consumers which verify the data in multiple > topics could read the same offsets multiple times, for instance when a > rebalance happens. > This would detect spurious duplicates, causing the test to fail. We should > dedup the consumed messages by offset and only fail the test if we have > duplicate values for a if for a unique set of offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.
[ https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4701: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Allow kafka brokers to dynamically reload truststore without restarting. > > > Key: KAFKA-4701 > URL: https://issues.apache.org/jira/browse/KAFKA-4701 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Allen Xiang >Priority: Major > Labels: security > Fix For: 2.0.0 > > > Right now in order to add SSL clients(update broker truststores), a rolling > restart of all brokers is required. This is very time consuming and > unnecessary. A dynamic truststore manager is needed to reload truststore from > file system without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5272) Improve validation for Alter Configs (KIP-133)
[ https://issues.apache.org/jira/browse/KAFKA-5272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5272: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Improve validation for Alter Configs (KIP-133) > -- > > Key: KAFKA-5272 > URL: https://issues.apache.org/jira/browse/KAFKA-5272 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > TopicConfigHandler.processConfigChanges() warns about certain topic configs. > We should include such validations in alter configs and reject the change if > the validation fails. Note that this should be done without changing the > behaviour of the ConfigCommand (as it does not have access to the broker > configs). > We should consider adding other validations like KAFKA-4092 and KAFKA-4680. > Finally, the AlterConfigsPolicy mentioned in KIP-133 will be added at the > same time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5276) Support derived and prefixed configs in DescribeConfigs (KIP-133)
[ https://issues.apache.org/jira/browse/KAFKA-5276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5276: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Support derived and prefixed configs in DescribeConfigs (KIP-133) > - > > Key: KAFKA-5276 > URL: https://issues.apache.org/jira/browse/KAFKA-5276 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > The broker supports config overrides per listener. The way we do that is by > prefixing the configs with the listener name. These configs are not defined > by ConfigDef and they don't appear in `values()`. They do appear in > `originals()`. We should change the code so that we return these configs. > Because these configs are read-only, nothing needs to be done for > AlterConfigs. > With regards to derived configs, an example is advertised.listeners, which > falls back to listeners. This is currently done outside AbstractConfig. We > should look into including these into AbstractConfig so that the fallback > happens for the returned configs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5029) cleanup javadocs and logging
[ https://issues.apache.org/jira/browse/KAFKA-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5029: -- Fix Version/s: (was: 1.1.0) 2.0.0 > cleanup javadocs and logging > > > Key: KAFKA-5029 > URL: https://issues.apache.org/jira/browse/KAFKA-5029 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > Remove state change logger, splitting it up into the controller logs or > broker logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4914: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Partition re-assignment tool should check types before persisting state in > ZooKeeper > > > Key: KAFKA-4914 > URL: https://issues.apache.org/jira/browse/KAFKA-4914 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.1.1 >Reporter: Nick Travers >Assignee: Nick Travers >Priority: Major > Fix For: 2.0.0 > > > The partition-reassignment too currently allows non-type-safe information to > be persisted into ZooKeeper, which can result in a ClassCastException at > runtime for brokers. > Specifically, this occurred when the broker assignment field was a List of > Strings, instead of a List of Integers. > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on Controller 10]: Error while handling broker changes > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at > kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435) > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-2435: -- Fix Version/s: (was: 1.1.0) 2.0.0 > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)
[ https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3297: -- Fix Version/s: (was: 1.1.0) 2.0.0 > More optimally balanced partition assignment strategy (new consumer) > > > Key: KAFKA-3297 > URL: https://issues.apache.org/jira/browse/KAFKA-3297 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the new consumer. For the original high-level consumer, > see KAFKA-2435. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356998#comment-16356998 ] Damian Guy commented on KAFKA-4879: --- [~hachikuji] is this going to make it for 1.1? > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer
[ https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4307: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Inconsistent parameters between console producer and consumer > - > > Key: KAFKA-4307 > URL: https://issues.apache.org/jira/browse/KAFKA-4307 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.1.0 >Reporter: Gwen Shapira >Assignee: Balint Molnar >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > kafka-console-producer uses --broker-list while kafka-console-consumer uses > --bootstrap-server. > Let's add --bootstrap-server to the producer for some consistency? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling
[ https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4665: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Inconsistent handling of non-existing topics in offset fetch handling > - > > Key: KAFKA-4665 > URL: https://issues.apache.org/jira/browse/KAFKA-4665 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > For version 0 of the offset fetch API, the broker returns > UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at > the time of fetching. In later versions, we skip this check. We do, however, > continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. > if the principal does not have Describe access to the corresponding topic). > We should probably make this behavior consistent across versions. > Note also that currently the consumer raises {{KafkaException}} when it > encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, > which is inconsistent with how we usually handle this error. This probably > doesn't cause any problems currently only because of the inconsistency > mentioned in the first paragraph above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time
[ https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4862: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Kafka client connect to a shutdown node will block for a long time > -- > > Key: KAFKA-4862 > URL: https://issues.apache.org/jira/browse/KAFKA-4862 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.2.0 >Reporter: Pengwei >Assignee: Pengwei >Priority: Major > Fix For: 2.0.0 > > > Currently in our test env, we found after one of the broker node crash(reboot > or os crash), the client maybe still connecting to the crash node to send > metadata request or other request, and it need about several minutes to > aware the connection is timeout then try another node to connect to send the > request. Then the client may still not aware the metadata change after > several minutes. > We don't have a connection timeout for the network client, we should add a > connection timeout for the client -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker
[ https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4249: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document how to customize GC logging options for broker > --- > > Key: KAFKA-4249 > URL: https://issues.apache.org/jira/browse/KAFKA-4249 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 0.10.0.1 >Reporter: Jim Hoagland >Assignee: Tom Bentley >Priority: Major > Fix For: 2.0.0 > > > We wanted to enable GC logging for Kafka broker and saw that you can set > GC_LOG_ENABLED=true. However, this didn't do what we wanted. For example, > the GC log will be overwritten every time the broker gets restarted. It > wasn't clear how we could do that (no documentation of it that I can find), > so I did some research by looking at the source code and did some testing and > found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to > starting broker. I posted my solution to StackOverflow: > > http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove > (feel free to critique) > That solution is now public, but it seems like the Kafka documentation should > say how to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default
[ https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4203: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Java producer default max message size does not align with broker default > - > > Key: KAFKA-4203 > URL: https://issues.apache.org/jira/browse/KAFKA-4203 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Grant Henke >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB > (MiB)) > The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 > bytes for overhead) > This means that by default the producer can try to produce messages larger > than the broker will accept resulting in RecordTooLargeExceptions. > There were not similar issues in the old producer because it sets > max.message.size = 100 (the base 10 value of 1 MB) > I propose we increase the broker default for max.message.bytes to 1048588 > (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message > produced with default configs from either producer does not result in a > RecordTooLargeException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.
[ https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3554: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Generate actual data with specific compression ratio and add multi-thread > support in the ProducerPerformance tool. > -- > > Key: KAFKA-3554 > URL: https://issues.apache.org/jira/browse/KAFKA-3554 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 2.0.0 > > > Currently the ProducerPerformance always generate the payload with same > bytes. This does not quite well to test the compressed data because the > payload is extremely compressible no matter how big the payload is. > We can make some changes to make it more useful for compressed messages. > Currently I am generating the payload containing integer from a given range. > By adjusting the range of the integers, we can get different compression > ratios. > API wise, we can either let user to specify the integer range or the expected > compression ratio (we will do some probing to get the corresponding range for > the users) > Besides that, in many cases, it is useful to have multiple producer threads > when the producer threads themselves are bottleneck. Admittedly people can > run multiple ProducerPerformance to achieve similar result, but it is still > different from the real case when people actually use the producer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3689) Exception when attempting to decrease connection count for address with no connections
[ https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3689: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Exception when attempting to decrease connection count for address with no > connections > -- > > Key: KAFKA-3689 > URL: https://issues.apache.org/jira/browse/KAFKA-3689 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 0.9.0.1 > Environment: ubuntu 14.04, > java version "1.7.0_95" > OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2) > OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode) > 3 broker cluster (all 3 servers identical - Intel Xeon E5-2670 @2.6GHz, > 8cores, 16 threads 64 GB RAM & 1 TB Disk) > Kafka Cluster is managed by 3 server ZK cluster (these servers are different > from Kafka broker servers). All 6 servers are connected via 10G switch. > Producers run from external servers. >Reporter: Buvaneswari Ramanan >Assignee: Ryan P >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-3689.log.redacted, kafka-3689-instrumentation.patch > > Original Estimate: 72h > Remaining Estimate: 72h > > As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org > with the same subject, I am creating this bug report. > The following error occurs in one of the brokers in our 3 broker cluster, > which serves about 8000 topics. These topics are single partitioned with a > replication factor = 3. Each topic gets data at a low rate – 200 bytes per > sec. Leaders are balanced across the topics. > Producers run from external servers (4 Ubuntu servers with same config as the > brokers), each producing to 2000 topics utilizing kafka-python library. > This error message occurs repeatedly in one of the servers. Between the hours > of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such > occurrences. This was right after a cluster restart. > This is not the first time we got this error in this broker. In those > instances, error occurred hours / days after cluster restart. > = > [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.IllegalArgumentException: Attempted to decrease connection count > for address with no connections, address: /X.Y.Z.144 (actual network address > masked) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:59) > at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.run(SocketServer.scala:445) > at java.lang.Thread.run(Thread.java:745) > [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.IllegalArgumentException: Attempted to decrease connection count > for address with no connections, address: /X.Y.Z.144 > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:59) > at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.run(SocketServer.scala:445) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()
[ https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3190: -- Fix Version/s: (was: 1.1.0) 2.0.0 > KafkaProducer should not invoke callback in send() > -- > > Key: KAFKA-3190 > URL: https://issues.apache.org/jira/browse/KAFKA-3190 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 2.0.0 > > > Currently KafkaProducer will invoke callback.onComplete() if it receives an > ApiException during send(). This breaks the guarantee that callback will be > invoked in order. It seems ApiException in send() only comes from metadata > refresh. If so, we can probably simply throw it instead of invoking > callback(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers
[ https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3438: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Rack Aware Replica Reassignment should warn of overloaded brokers > - > > Key: KAFKA-3438 > URL: https://issues.apache.org/jira/browse/KAFKA-3438 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.0 >Reporter: Ben Stopford >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > We've changed the replica reassignment code to be rack aware. > One problem that might catch users out would be that they rebalance the > cluster using kafka-reassign-partitions.sh but their rack configuration means > that some high proportion of replicas are pushed onto a single, or small > number of, brokers. > This should be an easy problem to avoid, by changing the rack assignment > information, but we should probably warn users if they are going to create > something that is unbalanced. > So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack > awareness enabled. If I add a 13th machine, on a new rack, and run the > rebalance tool, that new machine will get ~6x as many replicas as the least > loaded broker. > Suggest a warning be added to the tool output when --generate is called. > "The most loaded broker has 2.3x as many replicas as the the least loaded > broker. This is likely due to an uneven distribution of brokers across racks. > You're advised to alter the rack config so there are approximately the same > number of brokers per rack" and displays the individual rack→#brokers and > broker→#replicas data for the proposed move. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3177) Kafka consumer can hang when position() is called on a non-existing partition.
[ https://issues.apache.org/jira/browse/KAFKA-3177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3177: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Kafka consumer can hang when position() is called on a non-existing partition. > -- > > Key: KAFKA-3177 > URL: https://issues.apache.org/jira/browse/KAFKA-3177 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jason Gustafson >Priority: Critical > Fix For: 2.0.0 > > > This can be easily reproduced as following: > {code} > { > ... > consumer.assign(SomeNonExsitingTopicParition); > consumer.position(); > ... > } > {code} > It seems when position is called we will try to do the following: > 1. Fetch committed offsets. > 2. If there is no committed offsets, try to reset offset using reset > strategy. in sendListOffsetRequest(), if the consumer does not know the > TopicPartition, it will refresh its metadata and retry. In this case, because > the partition does not exist, we fall in to the infinite loop of refreshing > topic metadata. > Another orthogonal issue is that if the topic in the above code piece does > not exist, position() call will actually create the topic due to the fact > that currently topic metadata request could automatically create the topic. > This is a known separate issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3096) Leader is not set to -1 when it is shutdown if followers are down
[ https://issues.apache.org/jira/browse/KAFKA-3096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351886#comment-16351886 ] Damian Guy commented on KAFKA-3096: --- [~ijuma] is this still an issue? Can i move it out of 1.1? The PR referenced was closed a long time ago > Leader is not set to -1 when it is shutdown if followers are down > - > > Key: KAFKA-3096 > URL: https://issues.apache.org/jira/browse/KAFKA-3096 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > Assuming a cluster with 2 brokers with unclear leader election disabled: > 1. Start brokers 0 and 1 > 2. Perform partition assignment > 3. Broker 0 is elected leader > 4. Produce message and wait until metadata is propagated > 6. Shutdown follower > 7. Produce message > 8. Shutdown leader > 9. Start follower > 10. Wait for leader election > Expected: leader is -1 > Actual: leader is 0 > We have a test for this, but a bug in `waitUntilLeaderIsElectedOrChanged` > means that `newLeaderOpt` is not being checked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3866) KerberosLogin refresh time bug and other improvements
[ https://issues.apache.org/jira/browse/KAFKA-3866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351884#comment-16351884 ] Damian Guy commented on KAFKA-3866: --- [~ijuma] the PR for this hasn't been touched for more than a year. Can we move it out of 1.1? > KerberosLogin refresh time bug and other improvements > - > > Key: KAFKA-3866 > URL: https://issues.apache.org/jira/browse/KAFKA-3866 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.10.0.0 >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 1.1.0, 1.0.2 > > > ZOOKEEPER-2295 describes a bug in the Kerberos refresh time logic that is > also present in our KerberosLogin class. While looking at the code, I found a > number of things that could be improved. More details in the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5327) Console Consumer should only poll for up to max messages
[ https://issues.apache.org/jira/browse/KAFKA-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5327: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Console Consumer should only poll for up to max messages > > > Key: KAFKA-5327 > URL: https://issues.apache.org/jira/browse/KAFKA-5327 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Dustin Cote >Assignee: huxihx >Priority: Minor > Fix For: 1.2.0 > > > The ConsoleConsumer has a --max-messages flag that can be used to limit the > number of messages consumed. However, the number of records actually consumed > is governed by max.poll.records. This means you see one message on the > console, but your offset has moved forward a default of 500, which is kind of > counterintuitive. It would be good to only commit offsets for messages we > have printed to the console. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5792) Transient failure in KafkaAdminClientTest.testHandleTimeout
[ https://issues.apache.org/jira/browse/KAFKA-5792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5792: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Transient failure in KafkaAdminClientTest.testHandleTimeout > --- > > Key: KAFKA-5792 > URL: https://issues.apache.org/jira/browse/KAFKA-5792 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Assignee: Colin P. McCabe >Priority: Major > Labels: transient-unit-test-failure > Fix For: 1.2.0 > > > The {{KafkaAdminClientTest.testHandleTimeout}} test occasionally fails with > the following: > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testHandleTimeout(KafkaAdminClientTest.java:356) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6240) Support dynamic updates of frequently updated broker configs
[ https://issues.apache.org/jira/browse/KAFKA-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6240: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Support dynamic updates of frequently updated broker configs > > > Key: KAFKA-6240 > URL: https://issues.apache.org/jira/browse/KAFKA-6240 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. > Implementation will be done under sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path
[ https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6007: -- Fix Version/s: (was: 1.0.2) (was: 0.11.0.3) (was: 1.1.0) 1.2.0 > Connect can't validate against transforms in plugins.path > - > > Key: KAFKA-6007 > URL: https://issues.apache.org/jira/browse/KAFKA-6007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Stephane Maarek >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 1.2.0 > > > Kafka Connect can't validate a custom transformation if placed in plugins > path. > Here's the output I get on the validate call: > {code:java} > Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for > configuration transforms.Flat.type: Class > com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found. > Invalid value null for configuration transforms.Flat.type: Not a > Transformation > "recommended_values": [ > "com.mycorp.kafka.transforms.Flatten$Key", > "com.mycorp.kafka.transforms.Flatten$Value", > "com.mycorp.kafka.transforms.impl.FlattenSinkRecord", > "org.apache.kafka.connect.transforms.Cast$Key", > "org.apache.kafka.connect.transforms.Cast$Value", > "org.apache.kafka.connect.transforms.ExtractField$Key", > "org.apache.kafka.connect.transforms.ExtractField$Value", > "org.apache.kafka.connect.transforms.Flatten$Key", > "org.apache.kafka.connect.transforms.Flatten$Value", > "org.apache.kafka.connect.transforms.HoistField$Key", > "org.apache.kafka.connect.transforms.HoistField$Value", > "org.apache.kafka.connect.transforms.InsertField$Key", > "org.apache.kafka.connect.transforms.InsertField$Value", > "org.apache.kafka.connect.transforms.MaskField$Key", > "org.apache.kafka.connect.transforms.MaskField$Value", > "org.apache.kafka.connect.transforms.RegexRouter", > "org.apache.kafka.connect.transforms.ReplaceField$Key", > "org.apache.kafka.connect.transforms.ReplaceField$Value", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", > "org.apache.kafka.connect.transforms.TimestampConverter$Key", > "org.apache.kafka.connect.transforms.TimestampConverter$Value", > "org.apache.kafka.connect.transforms.TimestampRouter", > "org.apache.kafka.connect.transforms.ValueToKey"], > {code} > As you can see the class appear in the recommended values (!) but can't be > picked up on the validate call. > I believe it's because the recommender implements class discovery using > plugins: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194 > But the class inference itself doesn't: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199 > (I'm not an expert in class loading though, just a guess... Unsure how to fix) > A quick fix is to add the transformations in the ClassPath itself, but that > defeats the point a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6476) Document dynamic config update
[ https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6476: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Document dynamic config update > -- > > Key: KAFKA-6476 > URL: https://issues.apache.org/jira/browse/KAFKA-6476 > Project: Kafka > Issue Type: Sub-task > Components: core, documentation >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add documentation for dynamic broker config update. > Include: > - Command line options for kafka-configs.sh with examples > - Configs that can be updated along with constraints applied -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6452) Add documentation for delegation token authentication mechanism
[ https://issues.apache.org/jira/browse/KAFKA-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6452: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Add documentation for delegation token authentication mechanism > --- > > Key: KAFKA-6452 > URL: https://issues.apache.org/jira/browse/KAFKA-6452 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6111) Tests for KafkaZkClient
[ https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6111: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Tests for KafkaZkClient > --- > > Key: KAFKA-6111 > URL: https://issues.apache.org/jira/browse/KAFKA-6111 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Fix For: 1.2.0 > > > Some methods in KafkaZkClient have no tests at the moment and we need to fix > that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5834) AbstractConfig.logUnused() may log confusing warning information.
[ https://issues.apache.org/jira/browse/KAFKA-5834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5834: -- Fix Version/s: (was: 1.1.0) 1.2.0 > AbstractConfig.logUnused() may log confusing warning information. > - > > Key: KAFKA-5834 > URL: https://issues.apache.org/jira/browse/KAFKA-5834 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > Currently {{AbstractConfig.logUnused()}} logs unused configurations in at > WARN level. It is a little weird because as long as there is a configurable > class taking a configuration, that configuration will be logged as unused at > WARN level even if it is actually used. It seems better to make it an INFO > level logging instead, or maybe it can take a log level argument to allow > caller to decide which log level should be used. > [~hachikuji] [~ijuma] what do you think? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file
[ https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6492: -- Fix Version/s: (was: 1.1.0) 1.2.0 > LogSemgent.truncateTo() should always resize the index file > --- > > Key: KAFKA-6492 > URL: https://issues.apache.org/jira/browse/KAFKA-6492 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > The bug is the following: > # Initially on a follower broker there are two segments 0 and segment 1. > Segment 0 is empty (maybe due to log compaction) > # log is truncated to 0. > # LogSemgent.Truncate() will not find a message to truncate in segment 0, so > it will skip resizing the index/timeindex files. > # When a new message is fetched, Log.maybeRoll() will try to roll a new > segment because the index file of segment 0 is already full (max size is 0) > # After creating the new segment 0, the replica fetcher thread finds that > there is already a segment 0 exists. So it just throws exception and dies. > The fix would be let the broker make sure the index files of active segments > are always resized properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5857) Excessive heap usage on controller node during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5857: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Excessive heap usage on controller node during reassignment > --- > > Key: KAFKA-5857 > URL: https://issues.apache.org/jira/browse/KAFKA-5857 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.11.0.0 > Environment: CentOs 7, Java 1.8 >Reporter: Raoufeh Hashemian >Priority: Major > Labels: reliability > Fix For: 1.2.0 > > Attachments: CPU.png, disk_write_x.png, memory.png, > reassignment_plan.txt > > > I was trying to expand our kafka cluster of 6 broker nodes to 12 broker > nodes. > Before expansion, we had a single topic with 960 partitions and a replication > factor of 3. So each node had 480 partitions. The size of data in each node > was 3TB . > To do the expansion, I submitted a partition reassignment plan (see attached > file for the current/new assignments). The plan was optimized to minimize > data movement and be rack aware. > When I submitted the plan, it took approximately 3 hours for moving data from > old to new nodes to complete. After that, it started deleting source > partitions (I say this based on the number of file descriptors) and > rebalancing leaders which has not been successful. Meanwhile, the heap usage > in the controller node started to go up with a large slope (along with long > GC times) and it took 5 hours for the controller to go out of memory and > another controller started to have the same behaviour for another 4 hours. At > this time the zookeeper ran out of disk and the service stopped. > To recover from this condition: > 1) Removed zk logs to free up disk and restarted all 3 zk nodes > 2) Deleted /kafka/admin/reassign_partitions node from zk > 3) Had to do unclean restarts of kafka service on oom controller nodes which > took 3 hours to complete . After this stage there was still 676 under > replicated partitions. > 4) Do a clean restart on all 12 broker nodes. > After step 4 , number of under replicated nodes went to 0. > So I was wondering if this memory footprint from controller is expected for > 1k partitions ? Did we do sth wrong or it is a bug? > Attached are some resource usage graph during this 30 hours event and the > reassignment plan. I'll try to add log files as well -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5870) Idempotent producer: a producerId reset causes undesirable behavior for inflight batches to other partitions
[ https://issues.apache.org/jira/browse/KAFKA-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5870: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Idempotent producer: a producerId reset causes undesirable behavior for > inflight batches to other partitions > > > Key: KAFKA-5870 > URL: https://issues.apache.org/jira/browse/KAFKA-5870 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Labels: exactly-once > Fix For: 1.2.0 > > > Currently, if we have to reset the producer id for any reason (for instance > if batches to a partition get expired, if we get an > {{OutOfOrderSequenceException}}, etc) we could cause batches to other > --healthy-- partitions to fail with a spurious > {{OutOfOrderSequenceException}}. > This is detailed in this PR discussion: > https://github.com/apache/kafka/pull/3743#discussion_r137907630 > Ideally, we would want all inflight batches to be handled to completion > rather than potentially failing them prematurely. Further, since we want to > tighten up the semantics of the {{OutOfOrderSequenceException}}, at the very > least we should raise another exception in this case, because there is no > data loss on the broker when the client gives up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream
[ https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6292: -- Fix Version/s: (was: 1.1.0) 1.2.0 > KafkaConsumer ran into Unknown error fetching data for topic-partition caused > by integer overflow in FileLogInputStream > > > Key: KAFKA-6292 > URL: https://issues.apache.org/jira/browse/KAFKA-6292 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2 > Environment: OS:Red Hat Enterprise Linux Server release 7.3 (Maipo) > Kafka: kafka_2.12-0.11.0.0 > JDK: jdk1.8.0_121 >Reporter: Terence Yi >Priority: Major > Labels: easyfix, reliability > Fix For: 1.2.0 > > > Steps to reproduce: > * Broker config to reproduce this bug: > {code:java} > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > #2G > log.segment.bytes=2147483647 > {code} > * Setups: > producer sends messages constantly. > consumer polling > topic has 1 partitions and replication factor 1. > min.insync.replicas=1 > producer has "acks=all" > consumer has default "enable.auto.commit=false" > consumer manually commitSync offsets after handling messages. > kafka in standalone > * Observe log in consumer side(for me running 12 hours) > {code:java} > 2017-12-18 07:11:01.013 WARN sep105v1 > [app-consumer-subscription-pool-4-thread-20] > org.apache.kafka.clients.consumer.internals.Fetcher {} Unknown error fetching > data for topic-partition DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0 > {code} > * Observe server.log in Kafka/logs > {code:java} > [2017-12-14 04:52:21,144] ERROR [Replica Manager on Broker 3]: Error > processing fetch operation on partition > DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0, offset 4043314339 > (kafka.server.ReplicaManager) > org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read > `log header` from file channel `sun.nio.ch.FileChannelImpl@5604ea91`. > Expected to read 17 bytes, but reached end of file after reading 0 bytes. > Started read from position 2147483643. > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) > at > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > at > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > at > org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:279) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:176) > at kafka.log.LogSegment.read(LogSegment.scala:228) > at kafka.log.Log.read(Log.scala:938) > at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:719) > at > kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:780) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:779) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:617) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:615) > at kafka.server.KafkaApis.handle(KafkaApis.scala:98) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException: Failed to read `log header` from file > channel `sun.nio.ch.FileChannelImpl@5604ea91`. Expected to read 17 bytes, but > reached end of file after reading 0 bytes. Started read from position > 2147483643. > at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:751) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) > ... 18 more > {code} > * Impact: > # After EOF exception occurs, the consumer will failed to consume the remain > message > # After the segments log files which cause the EOF exception has been deleted > by the log Cleaner thread. Consumer recovered to consumer message. > # Have no impact from the view of producer > * Analysis:
[jira] [Updated] (KAFKA-6463) Review logging level for user errors in AdminManager
[ https://issues.apache.org/jira/browse/KAFKA-6463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6463: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Review logging level for user errors in AdminManager > > > Key: KAFKA-6463 > URL: https://issues.apache.org/jira/browse/KAFKA-6463 > Project: Kafka > Issue Type: Improvement >Reporter: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > AdminManager currently logs errors due to bad requests at INFO level (e.g. > alter configs with bad value). In other components, I think we only log user > errors are either not logged or logged at a lower logging level. We should > review logging in AdminManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings
[ https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6342: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Move workaround for JSON parsing of non-escaped strings > --- > > Key: KAFKA-6342 > URL: https://issues.apache.org/jira/browse/KAFKA-6342 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Rajini Sivaram >Assignee: Umesh Chaudhary >Priority: Major > Fix For: 1.2.0 > > > KAFKA-6319 added a workaround to parse invalid JSON persisted using older > versions of Kafka because special characters were not escaped. The workaround > is required in 1.0.1 to enable parsing invalid JSON from ACL configs in > ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for > 1.1.0 so that it is applied only to ACLs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6028) Improve the quota throttle communication.
[ https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6028: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Improve the quota throttle communication. > - > > Key: KAFKA-6028 > URL: https://issues.apache.org/jira/browse/KAFKA-6028 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > Currently if a client is throttled duet to quota violation, the broker will > only send back a response to the clients after the throttle time has passed. > In this case, the clients don't know how long the response will be throttled > and might hit request timeout before the response is returned. As a result > the clients will retry sending a request and results a even longer throttle > time. > The above scenario could happen when a large clients group sending records to > the brokers. We saw this when a MapReduce job pushes data to the Kafka > cluster. > To improve this, the broker can return the response with throttle time > immediately after processing the requests. After that, the broker will mute > the channel for this client. A correct client implementation should back off > for that long before sending the next request. If the client ignored the > throttle time and send the next request immediately, the channel will be > muted and the request won't be processed until the throttle time has passed. > A KIP will follow with more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6415) KafkaLog4jAppender deadlocks when logging from producer network thread
[ https://issues.apache.org/jira/browse/KAFKA-6415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6415: -- Fix Version/s: (was: 1.1.0) 1.2.0 > KafkaLog4jAppender deadlocks when logging from producer network thread > -- > > Key: KAFKA-6415 > URL: https://issues.apache.org/jira/browse/KAFKA-6415 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > When a log entry is appended to a Kafka topic using KafkaLog4jAppender, the > producer.send operation may block waiting for metadata. This can result in > deadlocks in a couple of scenarios if a log entry from the producer network > thread is also at a log level that results in the entry being appended to a > Kafka topic. > 1. Producer's network thread will attempt to send data to a Kafka topic and > this is unsafe since producer.send may block waiting for metadata, causing a > deadlock since the thread will not process the metadata request/response. > 2. KafkaLog4jAppender#append is invoked while holding the lock of the logger. > So the thread waiting for metadata in the initial send will be holding the > logger lock. If the producer network thread has.a log entry that needs to be > appended, it will attempt to acquire the logger lock and deadlock. > This was probably the case right from the beginning when KafkaLog4jAppender > was introduced, but did not cause any issues so far since there were only > debug log entries in that path which were not logged to a Kafka topic by any > of the tests. A recent info level log entry introduced by the commit > https://github.com/apache/kafka/commit/a3aea3cf4dbedb293f2d7859e0298bebc8e2185f > is causing system test failures in log4j_appender_test.py due to the > deadlock. > The asynchronous append case can be fixed by moving all send operations to a > separate thread. But KafkaLog4jAppender also has a syncSend option which > blocks append while holding the logger lock until the send completes. Not > sure how this can be fixed if we want to support log appends from the > producer network thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6438: -- Fix Version/s: (was: 1.1.0) 1.2.0 > NSEE while concurrently creating and deleting a topic > - > > Key: KAFKA-6438 > URL: https://issues.apache.org/jira/browse/KAFKA-6438 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 1.0.0 > Environment: kafka_2.11-1.0.0.jar > OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM > (build 25.102-b14, mixed mode) > CentOS Linux release 7.3.1611 (Core) >Reporter: Adam Kotwasinski >Priority: Major > Labels: reliability > Fix For: 1.2.0 > > > It appears that deleting a topic and creating it at the same time can cause > NSEE, what later results in a forced controller shutdown. > Most probably topics are being created because consumers/producers are still > active (yes, this means the deletion is happening blindly). > The main problem here (for me) is the controller switch, the data loss and > following unclean election is acceptable (as we admit to deleting blindly). > Environment description: > 20 kafka brokers > 80k partitions (20k topics 4partitions each) > 3 node ZK > Incident: > {code:java} > [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion > callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for > partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} > (kafka.controller.KafkaController) > [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for > replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic > in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OfflinePartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NonExistentPartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, > other, other2)], deleted topics: [Set()], new partition replica assignment > [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), > other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), > other-1 -> Vector(9), other-3 -> Vector(11))] > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback > for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation > callback for > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NewPartition for partitions > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OnlinePartition for
[jira] [Updated] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled
[ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6512: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Java Producer: Excessive memory usage with compression enabled > -- > > Key: KAFKA-6512 > URL: https://issues.apache.org/jira/browse/KAFKA-6512 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Windows 10 >Reporter: Kyle Tinker >Priority: Major > Fix For: 1.2.0 > > Attachments: KafkaSender.java > > > h2. User Story > As a user of the Java producer, I want a predictable memory usage for the > Kafka client so that I can ensure that my system is sized appropriately and > will be stable even under heavy usage. > As a user of the Java producer, I want a smaller memory footprint so that my > systems don't consume as many resources. > h2. Acceptance Criteria > * Enabling Compression in Kafka should not significantly increase the memory > usage of Kafka > * The memory usage of Kafka's Java Producer should be roughly in line with > the buffer size (buffer.memory) and the number of producers declared. > h2. Additional Information > I've observed high memory usage in the producer when enabling compression > (gzip or lz4). I don't observe the behavior with compression off, but with > it on I'll run out of heap (2GB). Using a Java profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but > is not successful. I'm most likely network bottlenecked, so I expect the > producer buffers to be full while the job is running and potentially a lot of > unacknowledged records. > I've tried using the default buffer.memory with 20 producers (across 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could still > run OOM in certain cases. I have max.in.flight.requests.per.connection set > to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but > I can easily exhaust 2000 MB used by Kafka. > In looking at the code more, it looks like the KafkaLZ4BlockOutputStream > doesn't clear the compressedBuffer or buffer when close() is called. In my > heap dump, both of those are ~65k size each, meaning that each batch is > taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 > and messages are 1k each until the batch fills). > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release > resources required for record appends (e.g. compression buffers)". However, > this method doesn't actually clear those buffers because > KafkaLZ4BlockOutputStream.close() only writes the block and end mark and > closes the output stream. It doesn't actually clear the buffer and > compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM > until the block is acknowledged by the broker, processed in > Sender:handleProduceResponse(), and the batch is deallocated. This memory > usage therefore increases, possibly without bound. In my test program, the > program died with approximately 345 unprocessed batches per producer (20 > producers), despite having max.in.flight.requests.per.connection=1. > h2. Steps to Reproduce > # Create a topic test with plenty of storage > # Use a connection with a very fast upload pipe and limited download. This > allows the outbound data to go out, but acknowledgements to be delayed > flowing in. > # Download KafkaSender.java (attached to this ticket) > # Set line 17 to reference your Kafka broker > # Run the program with a 1GB Xmx value > h2. Possible solutions > There are a few possible optimizations I can think of: > # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as > non-final and null them in the close() method > # We could declare the MemoryRecordsBuilder.appendStream non-final and null > it in the closeForRecordAppends() method > # We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because the > recordsBuilder contains significant metadata that is likely needed after the > stream is closed. It is also final. > # We could try to limit the number of non-acknowledged batches in flight. > This would bound the maximum memory usage but may negatively impact > performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other algorithms. > Fix #2 would improve all algorithms, compression and otherwise. Of the 3 > proposed here, it seems the best.
[jira] [Updated] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6335: -- Fix Version/s: (was: 1.1.0) 1.2.0 > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar >Priority: Major > Fix For: 1.2.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *, User:11 has Allow permission > for operations: Read from hosts: *, User:38 has Allow permission for > operations: Read from hosts: *, User:8 has Allow permission for operations: > Read from hosts: *, User:28 has Allow permission for
[jira] [Updated] (KAFKA-5964) Add more unit tests for SslTransportLayer
[ https://issues.apache.org/jira/browse/KAFKA-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5964: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Add more unit tests for SslTransportLayer > - > > Key: KAFKA-5964 > URL: https://issues.apache.org/jira/browse/KAFKA-5964 > Project: Kafka > Issue Type: Test >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add unit tests for the edge cases updated in KAFKA-5920: > 1. Test that handshake failures are propagated as SslAuthenticationException > even if there are I/O exceptions in any of the read/write operations > 2. Test that received data is processed even after an I/O exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6494) Extend ConfigCommand to update broker config using new AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6494: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Extend ConfigCommand to update broker config using new AdminClient > -- > > Key: KAFKA-6494 > URL: https://issues.apache.org/jira/browse/KAFKA-6494 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add --bootstrap-server and --command-config options for new AdminClient. > Update ConfigCommand to use new AdminClient for dynamic broker config updates > in KIP-226. Full conversion of ConfigCommand to new AdminClient will be done > later under KIP-248. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5693) TopicCreationPolicy and AlterConfigsPolicy overlap
[ https://issues.apache.org/jira/browse/KAFKA-5693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5693: -- Fix Version/s: (was: 1.1.0) 1.2.0 > TopicCreationPolicy and AlterConfigsPolicy overlap > -- > > Key: KAFKA-5693 > URL: https://issues.apache.org/jira/browse/KAFKA-5693 > Project: Kafka > Issue Type: Bug >Reporter: Tom Bentley >Priority: Minor > Labels: kip > Fix For: 1.2.0 > > > The administrator of a cluster can configure a {{CreateTopicPolicy}}, which > has access to the topic configs as well as other metadata to make its > decision about whether a topic creation is allowed. Thus in theory the > decision could be based on a combination of of the replication factor, and > the topic configs, for example. > Separately there is an AlterConfigPolicy, which only has access to the > configs (and can apply to configurable entities other than just topics). > There are potential issues with this. For example although the > CreateTopicPolicy is checked at creation time, it's not checked for any later > alterations to the topic config. So policies which depend on both the topic > configs and other topic metadata could be worked around by changing the > configs after creation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5782) Avoid unnecessary PID reset when expire batches.
[ https://issues.apache.org/jira/browse/KAFKA-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5782: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Avoid unnecessary PID reset when expire batches. > > > Key: KAFKA-5782 > URL: https://issues.apache.org/jira/browse/KAFKA-5782 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 0.11.0.0 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > This is more of an efficiency optimization. Currently we will reset PID when > batch expiration happens and one of the expired batches is in retry mode. > This is assuming that we don't know if the batch in retry has been appended > to the broker or not. However, if the batch was in retry due to a retriable > exception returned by the broker, the batch is not appended. In this case, we > do not need to reset the PID. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5661) Develop and understanding of how to tune transactions for optimal performance
[ https://issues.apache.org/jira/browse/KAFKA-5661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5661: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Develop and understanding of how to tune transactions for optimal performance > - > > Key: KAFKA-5661 > URL: https://issues.apache.org/jira/browse/KAFKA-5661 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 1.2.0 > > > Currently, we don't have an idea of the throughput curve for transactions > across a different range of workloads. > Thus we would like to understand how to tune transactions so that they are > viable across a broad range of work loads. For instance, what knobs can you > tweak if you use small messages to yet get acceptable transactional > performance? We don't understand the performance curve across variables like > message size, batch size, transaction duration, linger.ms, etc., and it would > be good to get an understanding of this area and publish recommended > configurations for different workloads. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5736) Improve error message in Connect when all kafka brokers are down
[ https://issues.apache.org/jira/browse/KAFKA-5736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5736: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Improve error message in Connect when all kafka brokers are down > > > Key: KAFKA-5736 > URL: https://issues.apache.org/jira/browse/KAFKA-5736 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 1.2.0 > > Original Estimate: 3h > Remaining Estimate: 3h > > Currently when all the Kafka brokers are down, Kafka Connect is failing with > a pretty unintuitive message when it tries to, for instance, reconfigure > tasks. > Example output: > {code:java} > [2017-08-15 19:12:09,959] ERROR Failed to reconfigure connector's tasks, > retrying after backoff: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > java.lang.IllegalArgumentException: CircularIterator can only be used on > non-empty lists > at > org.apache.kafka.common.utils.CircularIterator.(CircularIterator.java:29) > at > org.apache.kafka.clients.consumer.RoundRobinAssignor.assign(RoundRobinAssignor.java:61) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:68) > at > ... (connector code) > at > org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:230) > {code} > The error message needs to be improved, since its root cause is the absence > kafka brokers for assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5527) Idempotent/transactional Producer part 2 (KIP-98)
[ https://issues.apache.org/jira/browse/KAFKA-5527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5527: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Idempotent/transactional Producer part 2 (KIP-98) > - > > Key: KAFKA-5527 > URL: https://issues.apache.org/jira/browse/KAFKA-5527 > Project: Kafka > Issue Type: New Feature >Reporter: Ismael Juma >Priority: Major > Fix For: 1.2.0 > > > KAFKA-4815 tracks the items that were included in 0.11.0.0. This JIRA is for > tracking the ones that did not make it. Setting "Fix version" to 0.11.1.0, > but that is subject to change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5883) Run tests on Java 9 with –illegal-access=deny
[ https://issues.apache.org/jira/browse/KAFKA-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5883: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Run tests on Java 9 with –illegal-access=deny > - > > Key: KAFKA-5883 > URL: https://issues.apache.org/jira/browse/KAFKA-5883 > Project: Kafka > Issue Type: Task >Reporter: Ismael Juma >Priority: Major > Fix For: 1.2.0 > > > The default was changed from –illegal-access=deny to –illegal-access=warn > late in the Java 9 cycle. By using the former, we will ensure that our code > is not relying on functionality that will be removed in a future Java version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5795) Make the idempotent producer the default producer setting
[ https://issues.apache.org/jira/browse/KAFKA-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5795: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Make the idempotent producer the default producer setting > - > > Key: KAFKA-5795 > URL: https://issues.apache.org/jira/browse/KAFKA-5795 > Project: Kafka > Issue Type: New Feature >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 1.2.0 > > > We would like to turn on idempotence by default. The KIP is here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-185%3A+Make+exactly+once+in+order+delivery+per+partition+the+default+producer+setting -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5543) We don't remove the LastStableOffsetLag metric when a partition is moved away from a broker
[ https://issues.apache.org/jira/browse/KAFKA-5543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5543: -- Fix Version/s: (was: 1.1.0) 1.2.0 > We don't remove the LastStableOffsetLag metric when a partition is moved away > from a broker > --- > > Key: KAFKA-5543 > URL: https://issues.apache.org/jira/browse/KAFKA-5543 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 1.2.0 > > > Reported by [~junrao], we have a small leak where the `LastStableOffsetLag` > metric is not removed along with the other metrics in the > `Partition.removeMetrics` method. This could create a leak when partitions > are reassigned or a topic is deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6276) AdminClient may leave some futures hanging after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6276: -- Fix Version/s: (was: 1.1.0) 1.2.0 > AdminClient may leave some futures hanging after shutdown > - > > Key: KAFKA-6276 > URL: https://issues.apache.org/jira/browse/KAFKA-6276 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Jason Gustafson >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.2.0 > > > When the admin client closes, it should ensure that any pending futures get > cancelled. For the most part it does so, but from glancing at the code, it > seems possible that some calls which haven't been sent may be left hanging > after shutdown. In particular, this collection is not cleaned on shutdown: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L961. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5780) Long shutdown time when updated to 0.11.0
[ https://issues.apache.org/jira/browse/KAFKA-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5780: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Long shutdown time when updated to 0.11.0 > - > > Key: KAFKA-5780 > URL: https://issues.apache.org/jira/browse/KAFKA-5780 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0 > Environment: CentOS Linux release 7.3.1611 , Kernel 3.10 >Reporter: Raoufeh Hashemian >Assignee: Apurva Mehta >Priority: Major > Fix For: 1.2.0 > > Attachments: broker_shutdown.png, shutdown.log, > shutdown_controller.log, shutdown_statechange.log > > > When we switched from Kafka 0.10.2 to Kafka 0.11.0 , We faced a problem with > stopping the kafka service on a broker node. > Our cluster consists of 6 broker nodes. We had an existing topic when > switched to Kafka 0.11.0 . Since then, gracefully stoping the service on a > Kafka broker node results in the following warning message being repeated > every 100 ms in the broker log, and the shutdown takes approximately 45 > minutes to complete. > {code:java} > @4000599714da1e582e4c [2017-08-18 16:24:48,509] WARN Connection to node > 1002 could not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > @4000599714da245483a4 [2017-08-18 16:24:48,609] WARN Connection to node > 1002 could not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > @4000599714da2a51177c [2017-08-18 16:24:48,709] WARN Connection to node > 1002 could not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > Below is the last log lines when the shutdown is complete : > {code:java} > @400059971afd31113dbc [2017-08-18 16:50:59,823] WARN Connection to node > 1002 could not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > @400059971afd361200bc [2017-08-18 16:50:59,907] INFO Shutdown complete. > (kafka.log.LogManager) > @400059971afd36afa04c [2017-08-18 16:50:59,917] INFO Terminate ZkClient > event thread. (org.I0Itec.zkclient.ZkEventThread) > @400059971afd36dd6edc [2017-08-18 16:50:59,920] INFO Session: > 0x35d68c9e76702a4 closed (org.apache.zookeeper.ZooKeeper) > @400059971afd36deca84 [2017-08-18 16:50:59,920] INFO EventThread shut > down for session: 0x35d68c9e76702a4 (org.apache.zookeeper.ClientCnxn) > @400059971afd36f6afb4 [2017-08-18 16:50:59,922] INFO [Kafka Server 1002], > shut down completed (kafka.server.KafkaServer) > {code} > I should note that I stopped the producers before shutting down the broker. > If I repeat the process after brining up the service, the shutdown takes less > than a minute. However, if I start the producers even for a short time and > repeat the process, it will again take around 45 minutes to do a graceful > shutdown. > Attached files shows the brokers CPU usage during the shutdown period (light > blue curve is the node in which the broker service is shutting down). > The size of the topic is 2.3 TB per broker. > I was wondering if this is an expected behaviour or It is a bug or a > misconfiguration? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5682) Consumer should include partition in exceptions raised during record parsing/validation
[ https://issues.apache.org/jira/browse/KAFKA-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5682: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Consumer should include partition in exceptions raised during record > parsing/validation > --- > > Key: KAFKA-5682 > URL: https://issues.apache.org/jira/browse/KAFKA-5682 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > When we encounter an exception when validating a fetched record or when > deserializing it, we raise it to the user and keep the consumer's current > position at the offset of the failed record. The expectation is that the user > will either propagate the exception and shutdown or seek past the failed > record. However, in the latter case, there is no way for the user to know > which topic partition had the failed record. We should consider exposing an > exception type to expose this information which users can catch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5482) A CONCURRENT_TRANASCTIONS error for the first AddPartitionsToTxn request slows down transactions significantly
[ https://issues.apache.org/jira/browse/KAFKA-5482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5482: -- Fix Version/s: (was: 1.1.0) 1.2.0 > A CONCURRENT_TRANASCTIONS error for the first AddPartitionsToTxn request > slows down transactions significantly > -- > > Key: KAFKA-5482 > URL: https://issues.apache.org/jira/browse/KAFKA-5482 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 1.2.0 > > > Here is the issue. > # When we do a commit transaction, the producer sends an `EndTxn` request to > the coordinator. The coordinator writes the `PrepareCommit` message to the > transaction log and then returns the response the client. It writes the > transaction markers and the final 'CompleteCommit' message asynchronously. > # In the mean time, if the client starts another transaction, it will send an > `AddPartitions` request on the next `Sender.run` loop. If the markers haven't > been written yet, then the coordinator will return a retriable > `CONCURRENT_TRANSACTIONS` error to the client. > # The current behavior in the producer is to sleep for `retryBackoffMs` > before retrying the request. The current default for this is 100ms. So the > producer will sleep for 100ms before sending the `AddPartitions` again. This > puts a floor on the latency for back to back transactions. > This has been worked around in > https://issues.apache.org/jira/browse/KAFKA-5477 by reducing the retryBackoff > for the first AddPartitions request. But we need a stronger solution: like > having the commit block until the transaction is complete, or delaying the > addPartitions until batches are actually ready to be sent to the transaction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5945) Improve handling of authentication failures when credentials are removed
[ https://issues.apache.org/jira/browse/KAFKA-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5945: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Improve handling of authentication failures when credentials are removed > > > Key: KAFKA-5945 > URL: https://issues.apache.org/jira/browse/KAFKA-5945 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > KAFKA-5854 improves the handling of authentication failures. The scope of > KAFKA-5854 was limited to a specific scenario - provide better feedback to > applications when security is misconfigured. The PR improves diagnostics for > this scenario by throwing an AuthenticationException and also avoids retries. > To enable this, the first request initiated by any public API was updated to > throw authentication exceptions. > This JIRA is for a more extensive handling of authentication exceptions which > also includes proper handling of credential updates at any time. If a > credential is removed, then we could see authentication exception from any > request and we want to propagate this properly. This needs quite extensive > testing and is less likely to be hit by users, so it will be done later under > this JIRA. > The gaps that need covering are: > 1. Ensure authentication failures are processed in the Network client for any > request > 2. Ensure metadata refresh failures are handled properly at any time > 3. Heartbeat threads and other background threads should handle > authentication failures. Threads should not terminate on failure, but should > avoid retries until application performs a new operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5601) Refactor ReassignPartitionsCommand to use AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5601: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Refactor ReassignPartitionsCommand to use AdminClient > - > > Key: KAFKA-5601 > URL: https://issues.apache.org/jira/browse/KAFKA-5601 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > Currently the {{ReassignPartitionsCommand}} (used by > {{kafka-reassign-partitions.sh}}) talks directly to ZooKeeper. It would be > better to have it use the AdminClient API instead. > This would entail creating two new protocol APIs, one to initiate the request > and another to request the status of an in-progress reassignment. As such > this would require a KIP. > This touches on the work of KIP-166, but that proposes to use the > {{ReassignPartitionsCommand}} API, so should not be affected so long as that > API is maintained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5503: -- Fix Version/s: (was: 1.0.2) (was: 1.1.0) 1.2.0 > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 1.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5907) Support aggregatedJavadoc in Java 9
[ https://issues.apache.org/jira/browse/KAFKA-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5907: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Support aggregatedJavadoc in Java 9 > --- > > Key: KAFKA-5907 > URL: https://issues.apache.org/jira/browse/KAFKA-5907 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > Fix For: 1.2.0 > > > The Java 9 Javadoc tool has some improvements including a search bar. > However, it currently fails with a number of errors like: > {code} > > Task :aggregatedJavadoc > /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:29: > error: package org.apache.kafka.streams.processor.internals does not exist > import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; >^ > /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:30: > error: package org.apache.kafka.streams.processor.internals does not exist > import org.apache.kafka.streams.processor.internals.ProcessorNode; >^ > /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:31: > error: package org.apache.kafka.streams.processor.internals does not exist > import org.apache.kafka.streams.processor.internals.ProcessorTopology; >^ > /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:32: > error: package org.apache.kafka.streams.processor.internals does not exist > import org.apache.kafka.streams.processor.internals.SinkNode; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes
[ https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3806: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Adjust default values of log.retention.hours and offsets.retention.minutes > -- > > Key: KAFKA-3806 > URL: https://issues.apache.org/jira/browse/KAFKA-3806 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Michal Turek >Priority: Minor > Fix For: 1.2.0 > > > Combination of default values of log.retention.hours (168 hours = 7 days) and > offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special > cases. Offset retention should be always greater than log retention. > We have observed the following scenario and issue: > - Producing of data to a topic was disabled two days ago by producer update, > topic wasn't deleted. > - Consumer consumed all data and properly committed offsets to Kafka. > - Consumer made no more offset commits for that topic because there was no > more incoming data and there was nothing to confirm. (We have auto-commit > disabled, I'm not sure how behaves enabled auto-commit.) > - After one day: Kafka cleared too old offsets according to > offsets.retention.minutes. > - After two days: Long-term running consumer was restarted after update, it > didn't find any committed offsets for that topic since they were deleted by > offsets.retention.minutes so it started consuming from the beginning. > - The messages were still in Kafka due to larger log.retention.hours, about 5 > days of messages were read again. > Known workaround to solve this issue: > - Explicitly configure log.retention.hours and offsets.retention.minutes, > don't use defaults. > Proposals: > - Prolong default value of offsets.retention.minutes to be at least twice > larger than log.retention.hours. > - Check these values during Kafka startup and log a warning if > offsets.retention.minutes is smaller than log.retention.hours. > - Add a note to migration guide about differences between storing of offsets > in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned
[ https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6304: -- Fix Version/s: (was: 1.1.0) 1.2.0 > The controller should allow updating the partition reassignment for the > partitions being reassigned > --- > > Key: KAFKA-6304 > URL: https://issues.apache.org/jira/browse/KAFKA-6304 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > Currently the controller will not process the partition reassignment of a > partition if the partition is already being reassigned. > The issue is that if there is a broker failure during the partition > reassignment, the partition reassignment may never finish. And the users may > want to cancel the partition reassignment. However, the controller will > refuse to do that unless user manually deletes the partition reassignment zk > path, force a controller switch and then issue the revert command. This is > pretty involved. It seems reasonable for the controller to replace the > ongoing stuck reassignment and replace it with the updated partition > assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5729) Consumer should verify offset commits are from assigned partitions
[ https://issues.apache.org/jira/browse/KAFKA-5729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5729: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Consumer should verify offset commits are from assigned partitions > -- > > Key: KAFKA-5729 > URL: https://issues.apache.org/jira/browse/KAFKA-5729 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Priority: Major > Fix For: 1.2.0 > > > Need to think through the compatibility implications since we currently allow > this, but at a minimum, we should verify that only offsets from partitions > dynamically assigned can be committed. The lack of this validation tends to > mask problems in the partition revocation and assignment process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4693) Consumer subscription change during rebalance causes exception
[ https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4693: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Consumer subscription change during rebalance causes exception > -- > > Key: KAFKA-4693 > URL: https://issues.apache.org/jira/browse/KAFKA-4693 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Priority: Minor > Fix For: 1.2.0 > > > After every rebalance, the consumer validates that the assignment received > contains only partitions from topics that were subscribed. If not, then we > raise an exception to the user. It is possible for a wakeup or an interrupt > to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to > the coordinator in-flight). If the user then changes the topic subscription, > then this validation upon completion of the rebalance will fail. We should > probably detect the subscription change, eat the exception, and request > another rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6045) All access to log should fail if log is closed
[ https://issues.apache.org/jira/browse/KAFKA-6045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6045: -- Fix Version/s: (was: 1.1.0) 1.2.0 > All access to log should fail if log is closed > -- > > Key: KAFKA-6045 > URL: https://issues.apache.org/jira/browse/KAFKA-6045 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Priority: Major > Fix For: 1.2.0 > > > After log.close() or log.closeHandlers() is called for a given log, all uses > of the Log's API should fail with proper exception. For example, > log.appendAsLeader() should throw KafkaStorageException. APIs such as > Log.activeProducersWithLastSequence() should also fail but not necessarily > with KafkaStorageException, since the KafkaStorageException indicates failure > to access disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5950) AdminClient should retry based on returned error codes
[ https://issues.apache.org/jira/browse/KAFKA-5950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5950: -- Fix Version/s: (was: 1.1.0) 1.2.0 > AdminClient should retry based on returned error codes > -- > > Key: KAFKA-5950 > URL: https://issues.apache.org/jira/browse/KAFKA-5950 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > The AdminClient only retries if the request fails with a retriable error. If > a response is returned, then a retry is never attempted. This is inconsistent > with other clients that check the error codes in the response and retry for > each retriable error code. > We should consider if it makes sense to adopt this behaviour in the > AdminClient so that users don't have to do it themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3473) KIP-237: More Controller Health Metrics
[ https://issues.apache.org/jira/browse/KAFKA-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3473: -- Fix Version/s: (was: 1.1.0) 1.2.0 > KIP-237: More Controller Health Metrics > --- > > Key: KAFKA-3473 > URL: https://issues.apache.org/jira/browse/KAFKA-3473 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 1.0.1 >Reporter: Jiangjie Qin >Assignee: Dong Lin >Priority: Major > Fix For: 1.2.0 > > > Currently controller appends the requests to brokers into controller channel > manager queue during state transition. i.e. the state transition are > propagated asynchronously. We need to track the request queue time on the > controller side to see how long the state propagation is delayed after the > state transition finished on the controller. > We also want to have metrics to monitor the ControllerEventManager queue size > and the average time it takes for a event to wait in this queue before being > processed. > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics > for more detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4125: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: kip > Fix For: 1.2.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper , RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.
[ https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6489: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata > refresh topics set. > -- > > Key: KAFKA-6489 > URL: https://issues.apache.org/jira/browse/KAFKA-6489 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 1.2.0 > > > Currently if users call KafkaConsumer.offsetsForTimes() with a large set of > partitions. The consumer will add one topic at a time for the metadata > refresh. We should add all the topics to the metadata topics and just do one > metadata refresh. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
[ https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6501: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Add test to verify markPartitionsForTruncation after fetcher thread pool > resize > > > Key: KAFKA-6501 > URL: https://issues.apache.org/jira/browse/KAFKA-6501 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Follow-on task from KAFKA-6242 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6253) Improve sink connector topic regex validation
[ https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6253: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Improve sink connector topic regex validation > - > > Key: KAFKA-6253 > URL: https://issues.apache.org/jira/browse/KAFKA-6253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Jeff Klukas >Priority: Major > Fix For: 1.2.0 > > > KAFKA-3073 adds topic regex support for sink connectors. The addition > requires that you only specify one of topics or topics.regex settings. This > is being validated in one place, but not during submission of connectors. We > should improve this since this means it's possible to get a bad connector > config into the config topic. > For more detailed discussion, see > https://github.com/apache/kafka/pull/4151#pullrequestreview-77300221 -- This message was sent by Atlassian JIRA (v7.6.3#76005)