[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly
[ https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374023#comment-16374023 ] huxihx commented on KAFKA-6572: --- [~slucas] Could it be possible that some records in those segments have no timestamps or you ever ran kafka-delete-records.sh before? > kafka-consumer-groups does not reset offsets to specified datetime correctly > > > Key: KAFKA-6572 > URL: https://issues.apache.org/jira/browse/KAFKA-6572 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Sharon Lucas >Priority: Major > > With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh > --reset-offsets option to reset offsets to a specific date/time in our > production environment. > We first tried to use the kafka-consumer-groups.sh command with the > --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 > in our staging environment and it worked correctly. Running the following > command changed it to start processing logs from February 12, 2018 (4 days > ago) for a topic that had a large lag. We did a dry run to verify before > running with the --execute option. > {code:java} > root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh > --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest > --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic > staging-mon01-rg-elasticsearch --execute{code} > We stopped the kafka mirrors that process this topic before resetting the > offsets and started the kafka mirrors after rsetting the offsets. We > verified that it correctly started processing logs from February 12, 2018. > Then we tried resetting offsets in a production environment for a topic that > had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and > it did not work as expected. We stopped the kafka mirrors that process this > topic before resetting the offsets and did a dry run to see what the new > offsets would be: > {code:java} > root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime > 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access > Note: This will not show information about old Zookeeper-based consumers. > ^@^@^@^@ > TOPIC PARTITION NEW-OFFSET > prod_in-ams03-geo-ca_access 52 52084147 > prod_in-ams03-geo-ca_access 106 52154199 > prod_in-ams03-geo-ca_access 75 52148673 > prod_in-ams03-geo-ca_access 61 52130753 > prod_in-ams03-geo-ca_access 49 52151667 > prod_in-ams03-geo-ca_access 48 52145233 > prod_in-ams03-geo-ca_access 27 52092805 > prod_in-ams03-geo-ca_access 26 52139644 > prod_in-ams03-geo-ca_access 65 52157504 > prod_in-ams03-geo-ca_access 105 52166289 > prod_in-ams03-geo-ca_access 38 52160464 > prod_in-ams03-geo-ca_access 22 52093451 > prod_in-ams03-geo-ca_access 4 52151660 > prod_in-ams03-geo-ca_access 90 52160296 > prod_in-ams03-geo-ca_access 25 52161691 > prod_in-ams03-geo-ca_access 13 52145828 > prod_in-ams03-geo-ca_access 56 52162867 > prod_in-ams03-geo-ca_access 42 52072094 > prod_in-ams03-geo-ca_access 7 52069496 > prod_in-ams03-geo-ca_access 117 52087078 > prod_in-ams03-geo-ca_access 32 52073732 > prod_in-ams03-geo-ca_access 102 52082022 > prod_in-ams03-geo-ca_access 76 52141018 > prod_in-ams03-geo-ca_access 83 52154542 > prod_in-ams03-geo-ca_access 72 52095051 > prod_in-ams03-geo-ca_access 85 52149907 > prod_in-ams03-geo-ca_access 119 52134435 > prod_in-ams03-geo-ca_access 113 52159340 > prod_in-ams03-geo-ca_access 55 52146597 > prod_in-ams03-geo-ca_access 18 52149079 > prod_in-ams03-geo-ca_access 35 52149058 > prod_in-ams03-geo-ca_access 99 52143277 > prod_in-ams03-geo-ca_access 41 52158872 > prod_in-ams03-geo-ca_access 112 52083901 > prod_in-ams03-geo-ca_access 34 52137932 > prod_in-ams03-geo-ca_access 89 52158135 > prod_in-ams03-geo-ca_access 40 5212 > prod_in-ams03-geo-ca_access 53 52138400 > prod_in-ams03-geo-ca_access 19 52144966 > prod_in-ams03-geo-ca_access 44 52166404 > prod_in-ams03-geo-ca_access 31 52155685 > prod_in-ams03-geo-ca_access 10 52152151 > prod_in-ams03-geo-ca_access 98 52145378 > prod_in-ams03-geo-ca_access 69 52153436 > prod_in-ams03-geo-ca_access 92
[jira] [Resolved] (KAFKA-6578) Connect distributed and standalone worker 'main()' methods should catch and log all exceptions
[ https://issues.apache.org/jira/browse/KAFKA-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6578. Resolution: Fixed Fix Version/s: 1.1.0 > Connect distributed and standalone worker 'main()' methods should catch and > log all exceptions > -- > > Key: KAFKA-6578 > URL: https://issues.apache.org/jira/browse/KAFKA-6578 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Critical > Fix For: 1.1.0 > > > Currently, the {{main}} methods in {{ConnectDistributed}} and > {{ConnectStandalone}} do not catch and log most of the potential exceptions. > That means that when such an exception does occur, Java does terminate the > process and report it to stderr, but does not log the exception in the log. > We should add a try block around most of the existing code in the main method > to catch any Throwable exception, log it, and either rethrow it or explicitly > exit with a non-zero return code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6578) Connect distributed and standalone worker 'main()' methods should catch and log all exceptions
[ https://issues.apache.org/jira/browse/KAFKA-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373981#comment-16373981 ] ASF GitHub Bot commented on KAFKA-6578: --- hachikuji closed pull request #4609: KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions URL: https://github.com/apache/kafka/pull/4609 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 3b7ec87f644..54854fe4b80 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -58,52 +58,59 @@ public static void main(String[] args) throws Exception { Exit.exit(1); } -Time time = Time.SYSTEM; -log.info("Kafka Connect distributed worker initializing ..."); -long initStart = time.hiResClockMs(); -WorkerInfo initInfo = new WorkerInfo(); -initInfo.logAll(); +try { +Time time = Time.SYSTEM; +log.info("Kafka Connect distributed worker initializing ..."); +long initStart = time.hiResClockMs(); +WorkerInfo initInfo = new WorkerInfo(); +initInfo.logAll(); -String workerPropsFile = args[0]; -Map workerProps = !workerPropsFile.isEmpty() ? -Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); +String workerPropsFile = args[0]; +Map workerProps = !workerPropsFile.isEmpty() ? +Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); -log.info("Scanning for plugin classes. This might take a moment ..."); -Plugins plugins = new Plugins(workerProps); -plugins.compareAndSwapWithDelegatingLoader(); -DistributedConfig config = new DistributedConfig(workerProps); +log.info("Scanning for plugin classes. This might take a moment ..."); +Plugins plugins = new Plugins(workerProps); +plugins.compareAndSwapWithDelegatingLoader(); +DistributedConfig config = new DistributedConfig(workerProps); -String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); -log.debug("Kafka cluster ID: {}", kafkaClusterId); +String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); +log.debug("Kafka cluster ID: {}", kafkaClusterId); -RestServer rest = new RestServer(config); -URI advertisedUrl = rest.advertisedUrl(); -String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); +RestServer rest = new RestServer(config); +URI advertisedUrl = rest.advertisedUrl(); +String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); -offsetBackingStore.configure(config); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +offsetBackingStore.configure(config); -Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); +Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); -Converter internalValueConverter = worker.getInternalValueConverter(); -StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); -statusBackingStore.configure(config); +Converter internalValueConverter = worker.getInternalValueConverter(); +StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); +statusBackingStore.configure(config); -ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); +ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); -DistributedHerder herder = new DistributedHerder(config, time, worker, -kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl.toString()); -final Connect connect = new Connect(herder, rest); -log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); -try { -connect.start(); -} catch (Exception e) { -log.err
[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's
[ https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373961#comment-16373961 ] Ewen Cheslack-Postava commented on KAFKA-6433: -- This needs a lot of thought around upgrades, compatibility, and debuggability. There are all sorts of weird issues you can get into with something like this. I agree that the general goal of checking that important configs are aligned is absolutely the right thing to do. Today, unless I'm forgetting something, we basically only check that the group and config offset match. Lots of other things could potentially mismatch and cause problems. But things "matching" can be tricky. Topic names are pretty straightforward and we can validate easily. Validating anything like "the same set of connectors" is tricky given both versioning and upgrading a cluster with a *new* connector. Same for converters and transformations. We'd need to define clear rules for what "compatibility" means here and when a node is allowed to run a connector/task. And who is the source of truth? Who defines what's new? Personally, I'd argue it's actually clearer to have a log message saying "couldn't start connector X because class not found" from node Y than have to determine why all connectors/tasks are running on node Z because node W wasn't allowed to join worker group N for some mismatch of connectors. It might fail faster, but it tells you exactly what the problem is and leads to a clear resolution. > Connect distributed workers should fail if their config is "incompatible" > with leader's > --- > > Key: KAFKA-6433 > URL: https://issues.apache.org/jira/browse/KAFKA-6433 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > Currently, each distributed worker config must have the same `worker.id` and > must use the same internal topics for configs, offsets, and status. > Additionally, each worker must be configured to have the same connectors, > SMTs, and converters; confusing error messages will result when some workers > are able to deploy connector tasks with SMTs while others fail when they are > missing plugins the other workers do have. > Ideally, a Connect workers would only be allowed to join the cluster if it > were "compatible" with the the existing cluster, where "compatible" perhaps > includes using the same internal topics and having the same set of plugins. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools
[ https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373951#comment-16373951 ] Aditya Vivek commented on KAFKA-6585: - Thanks Jason :) > Consolidate duplicated logic on reset tools > --- > > Key: KAFKA-6585 > URL: https://issues.apache.org/jira/browse/KAFKA-6585 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Aditya Vivek >Priority: Minor > Labels: newbie > > The consumer reset tool and streams reset tool today shares lot of common > logics such as resetting to a datetime etc. We can consolidate them into a > common class which directly depend on admin client at simply let these tools > to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5740) Use separate file for HTTP logs
[ https://issues.apache.org/jira/browse/KAFKA-5740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373949#comment-16373949 ] Ewen Cheslack-Postava commented on KAFKA-5740: -- Isn't this something that can be handled via log4j configs to separate the two types of log traffic? Is this about defaults or some other issue that can't be handled via log4j? I'd make a strong *no* vote to removing the HTTP logs from the default Connect logs. They provide way too useful information when debugging issues. Just the HTTP status info often reveals something important (whether bad input triggering something in the 4xx range or something broken in Connect triggering the 5xx range). > Use separate file for HTTP logs > --- > > Key: KAFKA-5740 > URL: https://issues.apache.org/jira/browse/KAFKA-5740 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Randall Hauch >Priority: Critical > Labels: monitoring, usability > > Currently the HTTP logs are interspersed in the normal output. However, > client usage/requests should be logged to a separate file. > Question: should the HTTP logs be *removed* from the normal Connect logs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.
[ https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373925#comment-16373925 ] Ewen Cheslack-Postava commented on KAFKA-5575: -- [~rhauch] I think I didn't trigger a vote on the KIP yet though. Really I just grab public API changes that someone's hesitating on KIPs for and write the KIP because I think the 15min it takes for the simple ones isn't a big deal and gives the community, and especially frequent, direct contributors, a chance to catch issues like incompatibilities. I think a few minutes investment + a couple of days turn-around time for a public API change in an API that's used very broadly is actually quite a low cost. In this case, there's really nothing I can see that would be a problem beyond what's already mentioned in the KIP, so I think we should just move forward w/ voting and execution. > SchemaBuilder should have a method to clone an existing Schema. > --- > > Key: KAFKA-5575 > URL: https://issues.apache.org/jira/browse/KAFKA-5575 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > Labels: needs-kip > > Now that Transformations have landed in Kafka Connect we should have an easy > way to do quick modifications to schemas. For example changing the name of a > schema shouldn't be much more than. I should be able to do more stuff like > this. > {code:java} > return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-6583: -- Comment: was deleted (was: This issue will block KAFKA-4696 until resolved.) > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373906#comment-16373906 ] Ewen Cheslack-Postava commented on KAFKA-6551: -- Seems reasonable – this should only be an issue if producing to the topic is failing and we generate a large backlog, but very good point that this should be bounded, at least roughly, and pause poll()ing until it is resolved. A bit hard to say what the right metric for measurement is since this holds onto the entire record. Maybe # of records will work in practice just because you can set it to a reasonable default and never think about it again while still not hitting any OOMs. But any large messages could make that assumption fail. > Unbounded queues in WorkerSourceTask cause OutOfMemoryError > --- > > Key: KAFKA-6551 > URL: https://issues.apache.org/jira/browse/KAFKA-6551 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 > messages in the {{WorkerSourceTask#outstandingMessages}} map. > This map is unbounded and I can't see any way of "rate limiting" which would > control how many records are added to it. Growth can only indirectly be > limited by reducing the offset flush interval, but as connectors can return > large amounts of messages in single {{poll()}} calls that's not sufficient in > all cases. Note the user reported this issue during snapshotting a database, > i.e. a high number of records arrived in a very short period of time. > To solve the problem I'd suggest to make this map backpressure-aware and thus > prevent its indefinite growth, so that no further records will be polled from > the connector until messages have been taken out of the map again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5828) Allow explicitly setting polling interval for Kafka connectors
[ https://issues.apache.org/jira/browse/KAFKA-5828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373773#comment-16373773 ] Ewen Cheslack-Postava commented on KAFKA-5828: -- I don't think this is best solved at the Connect level. Could you just use [quotas|http://kafka.apache.org/documentation.html#design_quotas] to rate limit the connector? > Allow explicitly setting polling interval for Kafka connectors > -- > > Key: KAFKA-5828 > URL: https://issues.apache.org/jira/browse/KAFKA-5828 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Behrang Saeedzadeh >Priority: Major > > I have a Kafka cluster deployed in our internal data center and a Kafka > Connect server deployed in AWS that gets data records from a topic on the > Kafka cluster and writes them into a Kinesis stream. > We want to ensure that our Kafka connect server does not saturate the > available bandwidth between our internal data center and AWS. > Using {{max.partition.fetch.bytes}} we can limit the upper bound of data that > can be fetched in each poll call. If we can also configure the poll interval, > then we can limit how much data is transferred per partition per second. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373759#comment-16373759 ] Matthias J. Sax commented on KAFKA-6583: Thanks for putting the comment. It's better to "Link" Jiras to each other directly. You can find "Link" with "More" button. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6002) Kafka Connect Transform transforming JSON string into actual object
[ https://issues.apache.org/jira/browse/KAFKA-6002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373755#comment-16373755 ] Ewen Cheslack-Postava commented on KAFKA-6002: -- [~edvard.poliakov] Not sure if you made progress on this. The schema support in JsonConverter doesn't use json-schema.org style, partly because it is quite complicated. To be honest, the inline schema support with JSON is really more for demonstrative purposes – baking in and supporting a ton of formats adds a lot of overhead to the project, so we wanted to stick to shipping just one format with the framework and leave the rest to be community supported. However, that meant we needed to include something that could have both schema and schemaless modes in order to demonstrate both modes and ensure everything works with both modes. We ended up doing this with JSON and an ad hoc schema format. But generally when using a schema, you want something that doesn't need to ship the full schema inline with the message because that's quite heavyweight – often times the schema ends up larger than the message data itself! For a complete JSON w/ schemas solution, I would probably suggest implementing Converters that look a lot like what Confluent has for Avro and using json-schema.org to express the schemas. The one difference is that now that we have headers, I'd put the schema ID information into a header instead and make the value just the JSON payload (whereas Avro has some additional framing in the value itself). For a transformation that does this you *could* just omit the schema entirely. That is an option in Connect. Basically this would just mean that the transform only works when the user/connectors expect schemaless data. Regarding inference, you can also just do this on a per-message basis instead of continuously updating a schema. There is a risk that you end up with lots of schemas because of this (since each could be unique), but for a lot of cases that may not be expected. I also have an SMT that infers schemas, so does something similar to what you'd need here [https://github.com/ewencp/kafka/commit/3abb54a8062fe727ddaabc4dd5a552dd0b465a03] I didn't complete both modes, but the idea was to allow either inferring on a per-message basis *or* specifying a schema (whether the JsonConverter variant or json-schema.org style) and validating & add it to the record. I think offering those two options in your SMT would give good flexibility as well. > Kafka Connect Transform transforming JSON string into actual object > --- > > Key: KAFKA-6002 > URL: https://issues.apache.org/jira/browse/KAFKA-6002 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Edvard Poliakov >Priority: Minor > > My colleague and I have been working on a new Transform, that takes a JSON > string and transforms it into an actual object, like this: > {code} > { > "a" : "{\"b\": 23}" > } > {code} > into > {code} > { > "a" : { >"b" : 23 > } > } > {code} > There is no robust way of building a Schema from a JSON object itself, as it > can be something like an empty array or a null, that doesn't provide any info > on the schema of the object. So I see two options here. > 1. For a transform to take in schema as a transform parameter. The problem I > found with this is that it is not clear what JSON schema specification should > be used for this? I assume it would be reasonable to use > http://json-schema.org/, but it doesn't seem that Kafka Connect supports it > currently, moreover reading through JsonConverter class in Kafka Connect, I > am not able to understand what spec does the Json Schema have that is used in > that class, for example {{asConnectSchema}} method on {{JsonConverter}}, [see > here|https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L415]. > 2. On each object received, keep updating the schema, but I can't see a > standard and robust way of handling edge cases. > I am happy to create a pull request for this transform, if we can agree on > something here. :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-6583: -- Issue Type: New Feature (was: Improvement) > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373124#comment-16373124 ] Richard Yu edited comment on KAFKA-6583 at 2/23/18 12:52 AM: - This issue will block KAFKA-4696 until resolved. was (Author: yohan123): This issue will block https://issues.apache.org/jira/browse/KAFKA-4696 until resolved. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-6583: -- Labels: needs-kip (was: ) > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-4696: - Assignee: Richard Yu > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Damian Guy >Assignee: Richard Yu >Priority: Major > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373589#comment-16373589 ] ASF GitHub Bot commented on KAFKA-4696: --- ConcurrencyPractitioner opened a new pull request #4615: [KAFKA-4696] Streams standby task assignment should be state-store aware URL: https://github.com/apache/kafka/pull/4615 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Damian Guy >Priority: Major > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools
[ https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373546#comment-16373546 ] Jason Gustafson commented on KAFKA-6585: [~adityavivek94] I went ahead and assigned to you. Of course Guozhang won't mind :). > Consolidate duplicated logic on reset tools > --- > > Key: KAFKA-6585 > URL: https://issues.apache.org/jira/browse/KAFKA-6585 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Aditya Vivek >Priority: Minor > Labels: newbie > > The consumer reset tool and streams reset tool today shares lot of common > logics such as resetting to a datetime etc. We can consolidate them into a > common class which directly depend on admin client at simply let these tools > to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6585) Consolidate duplicated logic on reset tools
[ https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-6585: -- Assignee: Aditya Vivek > Consolidate duplicated logic on reset tools > --- > > Key: KAFKA-6585 > URL: https://issues.apache.org/jira/browse/KAFKA-6585 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Aditya Vivek >Priority: Minor > Labels: newbie > > The consumer reset tool and streams reset tool today shares lot of common > logics such as resetting to a datetime etc. We can consolidate them into a > common class which directly depend on admin client at simply let these tools > to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373432#comment-16373432 ] Matthias J. Sax commented on KAFKA-5157: It is by design. It's of course something different people have different opinions and both approaches has advantages/disadvantages. The design aligns with other components like serializers/deserializers though that use the same pattern. > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska >Priority: Major > Labels: kip, user-experience > Fix For: 1.0.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools
[ https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373401#comment-16373401 ] Aditya Vivek commented on KAFKA-6585: - [~guozhang] Can I pick this up? > Consolidate duplicated logic on reset tools > --- > > Key: KAFKA-6585 > URL: https://issues.apache.org/jira/browse/KAFKA-6585 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Priority: Minor > Labels: newbie > > The consumer reset tool and streams reset tool today shares lot of common > logics such as resetting to a datetime etc. We can consolidate them into a > common class which directly depend on admin client at simply let these tools > to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6586) Refactor Connect executables
[ https://issues.apache.org/jira/browse/KAFKA-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6586: - Description: The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a lot of duplication, and it'd be good to refactor to centralize the logic. We can pull most of this logic into an abstract class that {{ConnectStandalone}} and {{ConnectDistributed}} both extend. At a glance, the differences between the two are different config and Herder implementations and some different initialization logic. (was: The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a lot of duplication, and it'd be good to refactor to centralize the logic.) > Refactor Connect executables > > > Key: KAFKA-6586 > URL: https://issues.apache.org/jira/browse/KAFKA-6586 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Priority: Minor > > The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a > lot of duplication, and it'd be good to refactor to centralize the logic. We > can pull most of this logic into an abstract class that {{ConnectStandalone}} > and {{ConnectDistributed}} both extend. At a glance, the differences between > the two are different config and Herder implementations and some different > initialization logic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6586) Refactor Connect executables
Randall Hauch created KAFKA-6586: Summary: Refactor Connect executables Key: KAFKA-6586 URL: https://issues.apache.org/jira/browse/KAFKA-6586 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Randall Hauch The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a lot of duplication, and it'd be good to refactor to centralize the logic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6585) Consolidate duplicated logic on reset tools
[ https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6585: --- Summary: Consolidate duplicated logic on reset tools (was: Sonsolidate duplicated logic on reset tools) > Consolidate duplicated logic on reset tools > --- > > Key: KAFKA-6585 > URL: https://issues.apache.org/jira/browse/KAFKA-6585 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Priority: Minor > Labels: newbie > > The consumer reset tool and streams reset tool today shares lot of common > logics such as resetting to a datetime etc. We can consolidate them into a > common class which directly depend on admin client at simply let these tools > to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6585) Sonsolidate duplicated logic on reset tools
Guozhang Wang created KAFKA-6585: Summary: Sonsolidate duplicated logic on reset tools Key: KAFKA-6585 URL: https://issues.apache.org/jira/browse/KAFKA-6585 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang The consumer reset tool and streams reset tool today shares lot of common logics such as resetting to a datetime etc. We can consolidate them into a common class which directly depend on admin client at simply let these tools to use the class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5157: --- Description: When there is a bad formatted data in the source topics, deserialization will throw a runtime exception all the way to the users. And since deserialization happens before it was ever processed at the beginning of the topology, today there is no ways to handle such errors on the user-app level. We should consider allowing users to handle such "poison pills" in a customizable way. https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers was: When there is a bad formatted data in the source topics, deserialization will throw a runtime exception all the way to the users. And since deserialization happens before it was ever processed at the beginning of the topology, today there is no ways to handle such errors on the user-app level. We should consider allowing users to handle such "poison pills" in a customizable way. > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska >Priority: Major > Labels: kip, user-experience > Fix For: 1.0.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5157: --- Labels: kip user-experience (was: user-experience) > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska >Priority: Major > Labels: kip, user-experience > Fix For: 1.0.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373124#comment-16373124 ] Richard Yu commented on KAFKA-6583: --- This issue will block https://issues.apache.org/jira/browse/KAFKA-4696 until resolved. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-4696: -- External issue URL: (was: https://issues.apache.org/jira/browse/KAFKA-6583) > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Damian Guy >Priority: Major > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-4696: -- External issue URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Damian Guy >Priority: Major > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure
Chris Thunes created KAFKA-6584: --- Summary: Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure Key: KAFKA-6584 URL: https://issues.apache.org/jira/browse/KAFKA-6584 Project: Kafka Issue Type: Bug Affects Versions: 1.0.0 Reporter: Chris Thunes It seems that an edge case exists which can lead to sessions "un-expiring" during a ZooKeeper leadership failover. Additional details can be found in ZOOKEEPER-2985. This leads to a NODEXISTS error when attempting to re-create the ephemeral brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this issue on each node within a 3-node Kafka cluster running 1.0.0. All three nodes continued running (producers and consumers appeared unaffected), but none of the nodes were considered online and partition leadership could be not re-assigned. I took a quick look at trunk and I believe the issue is still present, but has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512] when it finds that the broker/ids/\{id} node exists, but belongs to the old (believed expired) session. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-6583: -- Affects Version/s: 0.10.2.0 0.11.0.0 > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6583) Metadata should include number of state stores for task
Richard Yu created KAFKA-6583: - Summary: Metadata should include number of state stores for task Key: KAFKA-6583 URL: https://issues.apache.org/jira/browse/KAFKA-6583 Project: Kafka Issue Type: Improvement Reporter: Richard Yu Currently, in the need for clients to be more evenly balanced, stateful tasks should be distributed in such a manner that it will be spread equally. However, for such an awareness to be implemented during task assignment, it would require the need for the present rebalance protocol metadata to also contain the number of state stores in a particular task. This way, it will allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-6583: -- Component/s: streams > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly
[ https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372986#comment-16372986 ] Sharon Lucas edited comment on KAFKA-6572 at 2/22/18 4:03 PM: -- [~huxi_2b] No. There were tons of messages whose timestamp was greater than the timestamp we specified. was (Author: slucas): No. There were tons of messages whose timestamp was greater than the timestamp we specified. > kafka-consumer-groups does not reset offsets to specified datetime correctly > > > Key: KAFKA-6572 > URL: https://issues.apache.org/jira/browse/KAFKA-6572 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Sharon Lucas >Priority: Major > > With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh > --reset-offsets option to reset offsets to a specific date/time in our > production environment. > We first tried to use the kafka-consumer-groups.sh command with the > --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 > in our staging environment and it worked correctly. Running the following > command changed it to start processing logs from February 12, 2018 (4 days > ago) for a topic that had a large lag. We did a dry run to verify before > running with the --execute option. > {code:java} > root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh > --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest > --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic > staging-mon01-rg-elasticsearch --execute{code} > We stopped the kafka mirrors that process this topic before resetting the > offsets and started the kafka mirrors after rsetting the offsets. We > verified that it correctly started processing logs from February 12, 2018. > Then we tried resetting offsets in a production environment for a topic that > had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and > it did not work as expected. We stopped the kafka mirrors that process this > topic before resetting the offsets and did a dry run to see what the new > offsets would be: > {code:java} > root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime > 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access > Note: This will not show information about old Zookeeper-based consumers. > ^@^@^@^@ > TOPIC PARTITION NEW-OFFSET > prod_in-ams03-geo-ca_access 52 52084147 > prod_in-ams03-geo-ca_access 106 52154199 > prod_in-ams03-geo-ca_access 75 52148673 > prod_in-ams03-geo-ca_access 61 52130753 > prod_in-ams03-geo-ca_access 49 52151667 > prod_in-ams03-geo-ca_access 48 52145233 > prod_in-ams03-geo-ca_access 27 52092805 > prod_in-ams03-geo-ca_access 26 52139644 > prod_in-ams03-geo-ca_access 65 52157504 > prod_in-ams03-geo-ca_access 105 52166289 > prod_in-ams03-geo-ca_access 38 52160464 > prod_in-ams03-geo-ca_access 22 52093451 > prod_in-ams03-geo-ca_access 4 52151660 > prod_in-ams03-geo-ca_access 90 52160296 > prod_in-ams03-geo-ca_access 25 52161691 > prod_in-ams03-geo-ca_access 13 52145828 > prod_in-ams03-geo-ca_access 56 52162867 > prod_in-ams03-geo-ca_access 42 52072094 > prod_in-ams03-geo-ca_access 7 52069496 > prod_in-ams03-geo-ca_access 117 52087078 > prod_in-ams03-geo-ca_access 32 52073732 > prod_in-ams03-geo-ca_access 102 52082022 > prod_in-ams03-geo-ca_access 76 52141018 > prod_in-ams03-geo-ca_access 83 52154542 > prod_in-ams03-geo-ca_access 72 52095051 > prod_in-ams03-geo-ca_access 85 52149907 > prod_in-ams03-geo-ca_access 119 52134435 > prod_in-ams03-geo-ca_access 113 52159340 > prod_in-ams03-geo-ca_access 55 52146597 > prod_in-ams03-geo-ca_access 18 52149079 > prod_in-ams03-geo-ca_access 35 52149058 > prod_in-ams03-geo-ca_access 99 52143277 > prod_in-ams03-geo-ca_access 41 52158872 > prod_in-ams03-geo-ca_access 112 52083901 > prod_in-ams03-geo-ca_access 34 52137932 > prod_in-ams03-geo-ca_access 89 52158135 > prod_in-ams03-geo-ca_access 40 5212 > prod_in-ams03-geo-ca_access 53 52138400 > prod_in-ams03-geo-ca_access 19 52144966 > prod_in-ams03-geo-ca_access 44 52166404 > prod_in-ams03-geo-ca_access 31 52155685 > prod_in-ams03-geo-ca_access 10 52
[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly
[ https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372986#comment-16372986 ] Sharon Lucas commented on KAFKA-6572: - No. There were tons of messages whose timestamp was greater than the timestamp we specified. > kafka-consumer-groups does not reset offsets to specified datetime correctly > > > Key: KAFKA-6572 > URL: https://issues.apache.org/jira/browse/KAFKA-6572 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Sharon Lucas >Priority: Major > > With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh > --reset-offsets option to reset offsets to a specific date/time in our > production environment. > We first tried to use the kafka-consumer-groups.sh command with the > --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 > in our staging environment and it worked correctly. Running the following > command changed it to start processing logs from February 12, 2018 (4 days > ago) for a topic that had a large lag. We did a dry run to verify before > running with the --execute option. > {code:java} > root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh > --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest > --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic > staging-mon01-rg-elasticsearch --execute{code} > We stopped the kafka mirrors that process this topic before resetting the > offsets and started the kafka mirrors after rsetting the offsets. We > verified that it correctly started processing logs from February 12, 2018. > Then we tried resetting offsets in a production environment for a topic that > had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and > it did not work as expected. We stopped the kafka mirrors that process this > topic before resetting the offsets and did a dry run to see what the new > offsets would be: > {code:java} > root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime > 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access > Note: This will not show information about old Zookeeper-based consumers. > ^@^@^@^@ > TOPIC PARTITION NEW-OFFSET > prod_in-ams03-geo-ca_access 52 52084147 > prod_in-ams03-geo-ca_access 106 52154199 > prod_in-ams03-geo-ca_access 75 52148673 > prod_in-ams03-geo-ca_access 61 52130753 > prod_in-ams03-geo-ca_access 49 52151667 > prod_in-ams03-geo-ca_access 48 52145233 > prod_in-ams03-geo-ca_access 27 52092805 > prod_in-ams03-geo-ca_access 26 52139644 > prod_in-ams03-geo-ca_access 65 52157504 > prod_in-ams03-geo-ca_access 105 52166289 > prod_in-ams03-geo-ca_access 38 52160464 > prod_in-ams03-geo-ca_access 22 52093451 > prod_in-ams03-geo-ca_access 4 52151660 > prod_in-ams03-geo-ca_access 90 52160296 > prod_in-ams03-geo-ca_access 25 52161691 > prod_in-ams03-geo-ca_access 13 52145828 > prod_in-ams03-geo-ca_access 56 52162867 > prod_in-ams03-geo-ca_access 42 52072094 > prod_in-ams03-geo-ca_access 7 52069496 > prod_in-ams03-geo-ca_access 117 52087078 > prod_in-ams03-geo-ca_access 32 52073732 > prod_in-ams03-geo-ca_access 102 52082022 > prod_in-ams03-geo-ca_access 76 52141018 > prod_in-ams03-geo-ca_access 83 52154542 > prod_in-ams03-geo-ca_access 72 52095051 > prod_in-ams03-geo-ca_access 85 52149907 > prod_in-ams03-geo-ca_access 119 52134435 > prod_in-ams03-geo-ca_access 113 52159340 > prod_in-ams03-geo-ca_access 55 52146597 > prod_in-ams03-geo-ca_access 18 52149079 > prod_in-ams03-geo-ca_access 35 52149058 > prod_in-ams03-geo-ca_access 99 52143277 > prod_in-ams03-geo-ca_access 41 52158872 > prod_in-ams03-geo-ca_access 112 52083901 > prod_in-ams03-geo-ca_access 34 52137932 > prod_in-ams03-geo-ca_access 89 52158135 > prod_in-ams03-geo-ca_access 40 5212 > prod_in-ams03-geo-ca_access 53 52138400 > prod_in-ams03-geo-ca_access 19 52144966 > prod_in-ams03-geo-ca_access 44 52166404 > prod_in-ams03-geo-ca_access 31 52155685 > prod_in-ams03-geo-ca_access 10 52152151 > prod_in-ams03-geo-ca_access 98 52145378 > prod_in-ams03-geo-ca_access 69 52153436 > prod_in-ams03-geo-ca_access 92 52093455 > prod_in-ams0
[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372875#comment-16372875 ] Josef Ludvíček commented on KAFKA-3450: --- Same problem with 0.11.0.1 Is there any plan to resolve this issue or any recommendation to work around ? > Producer blocks on send to topic that doesn't exist if auto create is disabled > -- > > Key: KAFKA-3450 > URL: https://issues.apache.org/jira/browse/KAFKA-3450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.9.0.1 >Reporter: Michal Turek >Priority: Critical > > {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if > the destination topic doesn't exist and if their automatic creation is > disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is > logged every 100 ms in a loop until the 60 seconds timeout expires, but the > operation is not recoverable. > Preconditions > - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false > - Kafka 0.9.0.1 clients. > Example minimalist code > https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java > {noformat} > /** > * Test of sending to a topic that does not exist while automatic creation of > topics is disabled in Kafka (auto.create.topics.enable=false). > */ > public class NoSuchTopicTest { > private static final Logger LOGGER = > LoggerFactory.getLogger(NoSuchTopicTest.class); > public static void main(String[] args) { > Properties properties = new Properties(); > properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > NoSuchTopicTest.class.getSimpleName()); > properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); > // Default is 60 seconds > try (Producer producer = new > KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) { > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > } > } > } > {noformat} > Related output > {noformat} > 2016-03-23 12:44:37.725 INFO c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: > Sending message (NoSuchTopicTest.java:26) > 2016-03-23 12:44:37.830 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 0 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:37.928 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 1 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.028 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 2 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.130 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 3 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.231 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 4 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.332 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 5 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.433
[jira] [Created] (KAFKA-6582) Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the broker.
Jurriaan Pruis created KAFKA-6582: - Summary: Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the broker. Key: KAFKA-6582 URL: https://issues.apache.org/jira/browse/KAFKA-6582 Project: Kafka Issue Type: Bug Components: network Affects Versions: 1.0.0 Environment: Ubuntu 16.04 Linux kafka04 4.4.0-109-generic #132-Ubuntu SMP Tue Jan 9 19:52:39 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux java version "9.0.1" Java(TM) SE Runtime Environment (build 9.0.1+11) Java HotSpot(TM) 64-Bit Server VM (build 9.0.1+11, mixed mode) but also tried with the latest JVM 8 before with the same result. Reporter: Jurriaan Pruis Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the 'single ISR' broker (if you describe the partitions of replicated topic it is clear that some partitions are only in sync on this broker). This bug resembles KAFKA-4477 a lot, but since that issue is marked as resolved this is probably something else but similar. We have the same issue (or at least it looks pretty similar) on Kafka 1.0. Since upgrading to Kafka 1.0 in November 2017 we've had these issues (we've upgraded from Kafka 0.10.2.1). This happens almost every 24-48 hours on a random broker. This is why we currently have a cronjob which restarts every broker every 24 hours. During this issue the ISR shows the following server log: {code:java} [2018-02-20 12:02:08,342] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.148.20:56352-96708 (kafka.network.Processor) [2018-02-20 12:02:08,364] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.150.25:54412-96715 (kafka.network.Processor) [2018-02-20 12:02:08,349] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.149.18:35182-96705 (kafka.network.Processor) [2018-02-20 12:02:08,379] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.150.25:54456-96717 (kafka.network.Processor) [2018-02-20 12:02:08,448] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.159.20:36388-96720 (kafka.network.Processor) [2018-02-20 12:02:08,683] WARN Attempting to send response via channel for which there is no open connection, connection id 10.132.0.32:9092-10.14.157.110:41922-96740 (kafka.network.Processor) {code} Also on the ISR broker, the controller log shows this: {code:java} [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-3-send-thread]: Controller 3 connected to 10.132.0.32:9092 (id: 3 rack: null) for sending state change requests (kafka.controller.RequestSendThread) [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-0-send-thread]: Controller 3 connected to 10.132.0.10:9092 (id: 0 rack: null) for sending state change requests (kafka.controller.RequestSendThread) [2018-02-20 12:02:14,928] INFO [Controller-3-to-broker-1-send-thread]: Controller 3 connected to 10.132.0.12:9092 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread){code} And the non-ISR brokers show these kind of errors: {code:java} 2018-02-20 12:02:29,204] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in fetch to broker 3, request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={..}, isolationLevel=READ_UNCOMMITTED) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 3 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:205) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:41) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:149) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-6581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sahil Aggarwal updated KAFKA-6581: -- Priority: Minor (was: Major) > ConsumerGroupCommand hangs if even one of the partition is unavailable > -- > > Key: KAFKA-6581 > URL: https://issues.apache.org/jira/browse/KAFKA-6581 > Project: Kafka > Issue Type: Bug > Components: admin, core, tools >Affects Versions: 0.10.0.0 >Reporter: Sahil Aggarwal >Priority: Minor > Fix For: 0.10.0.2 > > > ConsumerGroupCommand.scala uses consumer internally to get the position for > each partition but if the partition is unavailable the call > consumer.position(topicPartition) will block indefinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-6581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372688#comment-16372688 ] ASF GitHub Bot commented on KAFKA-6581: --- SahilAggarwal opened a new pull request #4612: KAFKA-6581: Fix the ConsumerGroupCommand indefinite execution if one of the partition is unavailable. URL: https://github.com/apache/kafka/pull/4612 * Checks if partition available before calling consumer.position * Adds timeout on consumer.position() call. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConsumerGroupCommand hangs if even one of the partition is unavailable > -- > > Key: KAFKA-6581 > URL: https://issues.apache.org/jira/browse/KAFKA-6581 > Project: Kafka > Issue Type: Bug > Components: admin, core, tools >Affects Versions: 0.10.0.0 >Reporter: Sahil Aggarwal >Priority: Major > Fix For: 0.10.0.2 > > > ConsumerGroupCommand.scala uses consumer internally to get the position for > each partition but if the partition is unavailable the call > consumer.position(topicPartition) will block indefinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372689#comment-16372689 ] AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477: - Ok Thanks > Node reduces its ISR to itself, and doesn't recover. Other nodes do not take > leadership, cluster remains sick until node is restarted. > -- > > Key: KAFKA-4477 > URL: https://issues.apache.org/jira/browse/KAFKA-4477 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: RHEL7 > java version "1.8.0_66" > Java(TM) SE Runtime Environment (build 1.8.0_66-b17) > Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode) >Reporter: Michael Andre Pearce >Assignee: Apurva Mehta >Priority: Critical > Labels: reliability > Fix For: 0.10.1.1 > > Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, > 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, > issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, > issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, > server_1_72server.log, server_2_73_server.log, server_3_74Server.log, > state_change_controller.tar.gz > > > We have encountered a critical issue that has re-occured in different > physical environments. We haven't worked out what is going on. We do though > have a nasty work around to keep service alive. > We do have not had this issue on clusters still running 0.9.01. > We have noticed a node randomly shrinking for the partitions it owns the > ISR's down to itself, moments later we see other nodes having disconnects, > followed by finally app issues, where producing to these partitions is > blocked. > It seems only by restarting the kafka instance java process resolves the > issues. > We have had this occur multiple times and from all network and machine > monitoring the machine never left the network, or had any other glitches. > Below are seen logs from the issue. > Node 7: > [2016-12-01 07:01:28,112] INFO Partition > [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking > ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from > 1,2,7 to 7 (kafka.cluster.Partition) > All other nodes: > [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 7 was disconnected before the response was > read > All clients: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > After this occurs, we then suddenly see on the sick machine an increasing > amount of close_waits and file descriptors. > As a work around to keep service we are currently putting in an automated > process that tails and regex's for: and where new_partitions hit just itself > we restart the node. > "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for > partition \[.*\] from (?P.+) to (?P.+) > \(kafka.cluster.Partition\)" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372683#comment-16372683 ] Håkon Åmdal commented on KAFKA-4477: Not sure, but if they use Kafka 1.0.0, all those commits seems to be included. > Node reduces its ISR to itself, and doesn't recover. Other nodes do not take > leadership, cluster remains sick until node is restarted. > -- > > Key: KAFKA-4477 > URL: https://issues.apache.org/jira/browse/KAFKA-4477 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: RHEL7 > java version "1.8.0_66" > Java(TM) SE Runtime Environment (build 1.8.0_66-b17) > Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode) >Reporter: Michael Andre Pearce >Assignee: Apurva Mehta >Priority: Critical > Labels: reliability > Fix For: 0.10.1.1 > > Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, > 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, > issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, > issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, > server_1_72server.log, server_2_73_server.log, server_3_74Server.log, > state_change_controller.tar.gz > > > We have encountered a critical issue that has re-occured in different > physical environments. We haven't worked out what is going on. We do though > have a nasty work around to keep service alive. > We do have not had this issue on clusters still running 0.9.01. > We have noticed a node randomly shrinking for the partitions it owns the > ISR's down to itself, moments later we see other nodes having disconnects, > followed by finally app issues, where producing to these partitions is > blocked. > It seems only by restarting the kafka instance java process resolves the > issues. > We have had this occur multiple times and from all network and machine > monitoring the machine never left the network, or had any other glitches. > Below are seen logs from the issue. > Node 7: > [2016-12-01 07:01:28,112] INFO Partition > [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking > ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from > 1,2,7 to 7 (kafka.cluster.Partition) > All other nodes: > [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 7 was disconnected before the response was > read > All clients: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > After this occurs, we then suddenly see on the sick machine an increasing > amount of close_waits and file descriptors. > As a work around to keep service we are currently putting in an automated > process that tails and regex's for: and where new_partitions hit just itself > we restart the node. > "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for > partition \[.*\] from (?P.+) to (?P.+) > \(kafka.cluster.Partition\)" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372665#comment-16372665 ] AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477: - Thanks [~hawk_aa] Are you know if in confluent stack this problem are fixed? AM. > Node reduces its ISR to itself, and doesn't recover. Other nodes do not take > leadership, cluster remains sick until node is restarted. > -- > > Key: KAFKA-4477 > URL: https://issues.apache.org/jira/browse/KAFKA-4477 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: RHEL7 > java version "1.8.0_66" > Java(TM) SE Runtime Environment (build 1.8.0_66-b17) > Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode) >Reporter: Michael Andre Pearce >Assignee: Apurva Mehta >Priority: Critical > Labels: reliability > Fix For: 0.10.1.1 > > Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, > 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, > issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, > issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, > server_1_72server.log, server_2_73_server.log, server_3_74Server.log, > state_change_controller.tar.gz > > > We have encountered a critical issue that has re-occured in different > physical environments. We haven't worked out what is going on. We do though > have a nasty work around to keep service alive. > We do have not had this issue on clusters still running 0.9.01. > We have noticed a node randomly shrinking for the partitions it owns the > ISR's down to itself, moments later we see other nodes having disconnects, > followed by finally app issues, where producing to these partitions is > blocked. > It seems only by restarting the kafka instance java process resolves the > issues. > We have had this occur multiple times and from all network and machine > monitoring the machine never left the network, or had any other glitches. > Below are seen logs from the issue. > Node 7: > [2016-12-01 07:01:28,112] INFO Partition > [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking > ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from > 1,2,7 to 7 (kafka.cluster.Partition) > All other nodes: > [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 7 was disconnected before the response was > read > All clients: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > After this occurs, we then suddenly see on the sick machine an increasing > amount of close_waits and file descriptors. > As a work around to keep service we are currently putting in an automated > process that tails and regex's for: and where new_partitions hit just itself > we restart the node. > "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for > partition \[.*\] from (?P.+) to (?P.+) > \(kafka.cluster.Partition\)" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372644#comment-16372644 ] Håkon Åmdal commented on KAFKA-4477: We ended up running our own build of Kafka 0.11.0 where we cherry picked these commits: * KAFKA-6042: Avoid deadlock between two groups with delayed operations * KAFKA-6003; Accept appends on replicas and when rebuilding the log unconditionally * KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking On the top of my head, I cannot remember which commit exactly solved this problem, but we've run without issues since November 2017. > Node reduces its ISR to itself, and doesn't recover. Other nodes do not take > leadership, cluster remains sick until node is restarted. > -- > > Key: KAFKA-4477 > URL: https://issues.apache.org/jira/browse/KAFKA-4477 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: RHEL7 > java version "1.8.0_66" > Java(TM) SE Runtime Environment (build 1.8.0_66-b17) > Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode) >Reporter: Michael Andre Pearce >Assignee: Apurva Mehta >Priority: Critical > Labels: reliability > Fix For: 0.10.1.1 > > Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, > 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, > issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, > issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, > server_1_72server.log, server_2_73_server.log, server_3_74Server.log, > state_change_controller.tar.gz > > > We have encountered a critical issue that has re-occured in different > physical environments. We haven't worked out what is going on. We do though > have a nasty work around to keep service alive. > We do have not had this issue on clusters still running 0.9.01. > We have noticed a node randomly shrinking for the partitions it owns the > ISR's down to itself, moments later we see other nodes having disconnects, > followed by finally app issues, where producing to these partitions is > blocked. > It seems only by restarting the kafka instance java process resolves the > issues. > We have had this occur multiple times and from all network and machine > monitoring the machine never left the network, or had any other glitches. > Below are seen logs from the issue. > Node 7: > [2016-12-01 07:01:28,112] INFO Partition > [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking > ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from > 1,2,7 to 7 (kafka.cluster.Partition) > All other nodes: > [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 7 was disconnected before the response was > read > All clients: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > After this occurs, we then suddenly see on the sick machine an increasing > amount of close_waits and file descriptors. > As a work around to keep service we are currently putting in an automated > process that tails and regex's for: and where new_partitions hit just itself > we restart the node. > "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for > partition \[.*\] from (?P.+) to (?P.+) > \(kafka.cluster.Partition\)" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372629#comment-16372629 ] AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477: - Hi All, I have same issue actually with kafka 0.11.0. Are you have a solution for this problem? I have a big impact for all my application? Many thanks > Node reduces its ISR to itself, and doesn't recover. Other nodes do not take > leadership, cluster remains sick until node is restarted. > -- > > Key: KAFKA-4477 > URL: https://issues.apache.org/jira/browse/KAFKA-4477 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: RHEL7 > java version "1.8.0_66" > Java(TM) SE Runtime Environment (build 1.8.0_66-b17) > Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode) >Reporter: Michael Andre Pearce >Assignee: Apurva Mehta >Priority: Critical > Labels: reliability > Fix For: 0.10.1.1 > > Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, > 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, > issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, > issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, > server_1_72server.log, server_2_73_server.log, server_3_74Server.log, > state_change_controller.tar.gz > > > We have encountered a critical issue that has re-occured in different > physical environments. We haven't worked out what is going on. We do though > have a nasty work around to keep service alive. > We do have not had this issue on clusters still running 0.9.01. > We have noticed a node randomly shrinking for the partitions it owns the > ISR's down to itself, moments later we see other nodes having disconnects, > followed by finally app issues, where producing to these partitions is > blocked. > It seems only by restarting the kafka instance java process resolves the > issues. > We have had this occur multiple times and from all network and machine > monitoring the machine never left the network, or had any other glitches. > Below are seen logs from the issue. > Node 7: > [2016-12-01 07:01:28,112] INFO Partition > [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking > ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from > 1,2,7 to 7 (kafka.cluster.Partition) > All other nodes: > [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 7 was disconnected before the response was > read > All clients: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > After this occurs, we then suddenly see on the sick machine an increasing > amount of close_waits and file descriptors. > As a work around to keep service we are currently putting in an automated > process that tails and regex's for: and where new_partitions hit just itself > we restart the node. > "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for > partition \[.*\] from (?P.+) to (?P.+) > \(kafka.cluster.Partition\)" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly
[ https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372618#comment-16372618 ] huxihx commented on KAFKA-6572: --- [~slucas] Currently, the partition's offset will be set to the LEO (log end offset) if there is no such message whose timestamp is greater than the given timestamp. Could it be possible in your case? > kafka-consumer-groups does not reset offsets to specified datetime correctly > > > Key: KAFKA-6572 > URL: https://issues.apache.org/jira/browse/KAFKA-6572 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Sharon Lucas >Priority: Major > > With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh > --reset-offsets option to reset offsets to a specific date/time in our > production environment. > We first tried to use the kafka-consumer-groups.sh command with the > --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 > in our staging environment and it worked correctly. Running the following > command changed it to start processing logs from February 12, 2018 (4 days > ago) for a topic that had a large lag. We did a dry run to verify before > running with the --execute option. > {code:java} > root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh > --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest > --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic > staging-mon01-rg-elasticsearch --execute{code} > We stopped the kafka mirrors that process this topic before resetting the > offsets and started the kafka mirrors after rsetting the offsets. We > verified that it correctly started processing logs from February 12, 2018. > Then we tried resetting offsets in a production environment for a topic that > had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and > it did not work as expected. We stopped the kafka mirrors that process this > topic before resetting the offsets and did a dry run to see what the new > offsets would be: > {code:java} > root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime > 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access > Note: This will not show information about old Zookeeper-based consumers. > ^@^@^@^@ > TOPIC PARTITION NEW-OFFSET > prod_in-ams03-geo-ca_access 52 52084147 > prod_in-ams03-geo-ca_access 106 52154199 > prod_in-ams03-geo-ca_access 75 52148673 > prod_in-ams03-geo-ca_access 61 52130753 > prod_in-ams03-geo-ca_access 49 52151667 > prod_in-ams03-geo-ca_access 48 52145233 > prod_in-ams03-geo-ca_access 27 52092805 > prod_in-ams03-geo-ca_access 26 52139644 > prod_in-ams03-geo-ca_access 65 52157504 > prod_in-ams03-geo-ca_access 105 52166289 > prod_in-ams03-geo-ca_access 38 52160464 > prod_in-ams03-geo-ca_access 22 52093451 > prod_in-ams03-geo-ca_access 4 52151660 > prod_in-ams03-geo-ca_access 90 52160296 > prod_in-ams03-geo-ca_access 25 52161691 > prod_in-ams03-geo-ca_access 13 52145828 > prod_in-ams03-geo-ca_access 56 52162867 > prod_in-ams03-geo-ca_access 42 52072094 > prod_in-ams03-geo-ca_access 7 52069496 > prod_in-ams03-geo-ca_access 117 52087078 > prod_in-ams03-geo-ca_access 32 52073732 > prod_in-ams03-geo-ca_access 102 52082022 > prod_in-ams03-geo-ca_access 76 52141018 > prod_in-ams03-geo-ca_access 83 52154542 > prod_in-ams03-geo-ca_access 72 52095051 > prod_in-ams03-geo-ca_access 85 52149907 > prod_in-ams03-geo-ca_access 119 52134435 > prod_in-ams03-geo-ca_access 113 52159340 > prod_in-ams03-geo-ca_access 55 52146597 > prod_in-ams03-geo-ca_access 18 52149079 > prod_in-ams03-geo-ca_access 35 52149058 > prod_in-ams03-geo-ca_access 99 52143277 > prod_in-ams03-geo-ca_access 41 52158872 > prod_in-ams03-geo-ca_access 112 52083901 > prod_in-ams03-geo-ca_access 34 52137932 > prod_in-ams03-geo-ca_access 89 52158135 > prod_in-ams03-geo-ca_access 40 5212 > prod_in-ams03-geo-ca_access 53 52138400 > prod_in-ams03-geo-ca_access 19 52144966 > prod_in-ams03-geo-ca_access 44 52166404 > prod_in-ams03-geo-ca_access 31 52155685 > prod_in-ams03-geo-ca_access 10 52152151 > prod_in-ams03-geo-ca_access 98 52145378 > prod_in-ams03-geo-ca_ac
[jira] [Commented] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation
[ https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372606#comment-16372606 ] ASF GitHub Bot commented on KAFKA-6577: --- dguy closed pull request #4610: KAFKA-6577: Fix Connect system tests and add debug messages URL: https://github.com/apache/kafka/pull/4610 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 4afa47dda1a..3b7ec87f644 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -74,6 +74,7 @@ public static void main(String[] args) throws Exception { DistributedConfig config = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); +log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 17699053541..413cb46cf28 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -78,6 +78,7 @@ public static void main(String[] args) throws Exception { StandaloneConfig config = new StandaloneConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); +log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index b34e48390e1..e51b365cec6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -432,6 +432,7 @@ public void putTargetState(String connector, TargetState state) { Runnable createTopics = new Runnable() { @Override public void run() { +log.debug("Creating admin client to manage Connect internal config topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index f29f3c23d03..fb8ad97b48d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -94,6 +94,7 @@ public void configure(final WorkerConfig config) { Runnable createTopics = new Runnable() { @Override public void run() { +log.debug("Creating admin client to manage Connect internal offset topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 8ca21ebb350..6710808f9a9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -157,6 +157,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) Runnable createTopics = new Runnable() { @Override public void run() { +log.debug("Creating admin client to manage Connect internal status topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/con
[jira] [Resolved] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation
[ https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6577. --- Resolution: Fixed Fix Version/s: 1.2.0 Issue resolved by pull request 4610 [https://github.com/apache/kafka/pull/4610] > Connect standalone SASL file source and sink test fails without explanation > --- > > Key: KAFKA-6577 > URL: https://issues.apache.org/jira/browse/KAFKA-6577 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 1.2.0, 1.1.0 > > > The > {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}} > test is failing with the SASL configuration without a sufficient > explanation. During the test, the Connect worker fails to start, but the > Connect log contains no useful information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372564#comment-16372564 ] Marcel "childNo͡.de" Trautwein edited comment on KAFKA-5157 at 2/22/18 9:08 AM: Just a question ... What I "dislike" here from a native point of view, is that if you want to configure your implementing {{DeserializationExceptionHandler}} you have that well defined {{Configurable}} interface, but what is injected is the {{StreamsConfig}} so you have to put configurations into the streamsConfig map to access them later in the implementation ... is this really the desired / designed procedure and / or best practice? P.S: yes, I know, discussion to KIP was / is in [http://mail-archives.apache.org/mod_mbox/kafka-dev/201705.mbox/%3c3da51c0a-f7d8-4134-b641-06eb3f2a1...@gmail.com%3e] :) was (Author: childnode): Just a question ... What I "dislike" here from a native point of view, is that if you want to configure your implementing {{DeserializationExceptionHandler}} you have that well defined {{Configurable}} interface, but what is injected is the {{StreamsConfig}} so you have to put configurations into the streamsConfig map to access them later in the implementation ... is this really the desired / designed procedure and / or best practice? > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska >Priority: Major > Labels: user-experience > Fix For: 1.0.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372564#comment-16372564 ] Marcel "childNo͡.de" Trautwein commented on KAFKA-5157: --- Just a question ... What I "dislike" here from a native point of view, is that if you want to configure your implementing {{DeserializationExceptionHandler}} you have that well defined {{Configurable}} interface, but what is injected is the {{StreamsConfig}} so you have to put configurations into the streamsConfig map to access them later in the implementation ... is this really the desired / designed procedure and / or best practice? > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska >Priority: Major > Labels: user-experience > Fix For: 1.0.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable
Sahil Aggarwal created KAFKA-6581: - Summary: ConsumerGroupCommand hangs if even one of the partition is unavailable Key: KAFKA-6581 URL: https://issues.apache.org/jira/browse/KAFKA-6581 Project: Kafka Issue Type: Bug Components: admin, core, tools Affects Versions: 0.10.0.0 Reporter: Sahil Aggarwal Fix For: 0.10.0.2 ConsumerGroupCommand.scala uses consumer internally to get the position for each partition but if the partition is unavailable the call consumer.position(topicPartition) will block indefinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)