[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380730#comment-17380730 ] Ewen Cheslack-Postava commented on KAFKA-3539: -- [~moses.nakamura] I spent a bunch of time on the clients in the past, but I've barely been involved for the past few years. So not really even sure of the current state of implementation and tests (e.g. I wasn't working on the clients when EoS was implemented). What tests fail due to minor changes? If they are unit tests, that should be unexpected unless you are changing public API, which would require a KIP anyway. You might also just be seeing flakiness in integration tests, which unfortunately is expected. Providing a list of the tests that break and whether it's compilation or runtime issues would probably help, but someone more active can probably provide better guidance. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343043#comment-17343043 ] Ewen Cheslack-Postava commented on KAFKA-3539: -- [~mjsax] They aren't really duplicates. It's possible to also just have races between whatever creates a topic and a process that tries to produce to it. I think 3450 implicitly assumes a simple sequential model where create should complete, and only then processes are deployed. In practice, there are lots of convergence based systems that can do without this with the right preflight checks (topic creation may happen in parallel to deploying an app that uses it, but does not have permissions to create topics). They may be related, but at least the high level description of 3450 wrt topic auto creation doesn't quite align with this one. [~moses.nakamura] this is a very tricky thing to get right. At first glance we should make anything blocking (or with simple timeout) just be tied to the producer constructor, but at that point we don't know what info to get. Adding a queue can help, but adds overhead and really just delays the issue. Configurable timeouts allow you to set your tolerance for blocking operations (you're going to spend at least some time in the send() call anyway). Alternatively, we could not wait on metadata at all and only check if it's available, request it if it is not, and then bail with a RetriableException. Pretty much any solution proposed so far is going to need a KIP (it's at a minimum a behavioral change, for your option 1 it's an API addition). A good starting point if you want to address this is to enumerate the variety of options and their pros/cons/compatibility issues. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6009) Fix formatting of autogenerated docs tables
[ https://issues.apache.org/jira/browse/KAFKA-6009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305855#comment-17305855 ] Ewen Cheslack-Postava commented on KAFKA-6009: -- [~zacky] yes, of course. I've assigned the ticket to you. > Fix formatting of autogenerated docs tables > --- > > Key: KAFKA-6009 > URL: https://issues.apache.org/jira/browse/KAFKA-6009 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Ewen Cheslack-Postava >Assignee: Takeshi Yamasaki >Priority: Major > > In reviewing https://github.com/apache/kafka/pull/3987 I noticed that the > autogenerated tables currently differ from the manually created ones. The > manual ones have 3 columns -- metric/attribute name, description, and mbean > name with a regex. The new ones have 3 columns, but for some reason the first > one is just empty, the second is the metric/attribute name, the last one is > the description, and there is no regex. > We could potentially just drop to two columns since the regex column is > generally very repetitive and is now handled by a header row giving the > general group mbean name info/format. The one thing that seems to currently > be missing is the regex that would restrict the format of these (although > these weren't really technically enforced and some of the restrictions are > being removed, e.g. see some of the follow up discussion to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-6009) Fix formatting of autogenerated docs tables
[ https://issues.apache.org/jira/browse/KAFKA-6009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6009: Assignee: Takeshi Yamasaki > Fix formatting of autogenerated docs tables > --- > > Key: KAFKA-6009 > URL: https://issues.apache.org/jira/browse/KAFKA-6009 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Ewen Cheslack-Postava >Assignee: Takeshi Yamasaki >Priority: Major > > In reviewing https://github.com/apache/kafka/pull/3987 I noticed that the > autogenerated tables currently differ from the manually created ones. The > manual ones have 3 columns -- metric/attribute name, description, and mbean > name with a regex. The new ones have 3 columns, but for some reason the first > one is just empty, the second is the metric/attribute name, the last one is > the description, and there is no regex. > We could potentially just drop to two columns since the regex column is > generally very repetitive and is now handled by a header row giving the > general group mbean name info/format. The one thing that seems to currently > be missing is the regex that would restrict the format of these (although > these weren't really technically enforced and some of the restrictions are > being removed, e.g. see some of the follow up discussion to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3988) KafkaConfigBackingStore assumes configs will be stored as schemaless maps
[ https://issues.apache.org/jira/browse/KAFKA-3988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294009#comment-17294009 ] Ewen Cheslack-Postava commented on KAFKA-3988: -- [~pachima...@gmail.com] It's marked WONTFIX because we concluded it didn't make much sense to – as mentioned above, KIP-174 gets rid of the ability to even configure the internal converters. You should remove the internal.converter settings that specify schemas.enable=true. > KafkaConfigBackingStore assumes configs will be stored as schemaless maps > - > > Key: KAFKA-3988 > URL: https://issues.apache.org/jira/browse/KAFKA-3988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Original Estimate: 4h > Remaining Estimate: 4h > > If you use an internal key/value converter that drops schema information (as > is the default in the config files we provide since we use JsonConverter with > schemas.enable=false), the schemas we use that are structs get converted to > maps since we don't know the structure to decode them to. Because our tests > run with these settings, we haven't validated that the code works if schemas > are preserved. > When they are preserved, we'll hit an error message like this > {quote} > [2016-07-25 07:36:34,828] ERROR Found connector configuration > (connector-test-mysql-jdbc) in wrong format: class > org.apache.kafka.connect.data.Struct > (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498) > {quote} > because the code currently checks that it is working with a map. We should > actually be checking for either a Struct or a Map. This same problem probably > affects a couple of other types of data in the same class as Connector > configs, Task configs, Connect task lists, and target states are all Structs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10380) Make dist flatten rocksdbjni
[ https://issues.apache.org/jira/browse/KAFKA-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181366#comment-17181366 ] Ewen Cheslack-Postava commented on KAFKA-10380: --- Probably the best solution would be to reorganize the binary dist to have subdirectories for clients, core, streams, connect, etc, but that get's tricky to because then bin scripts need to deal with more classpath stuff or jars get duplicated, we'd need to figure out how config files get organized, etc. But that would substantially reduce certain use cases, e.g. connect also pulls in jetty/jersey since it was a REST API, both of which are pretty large with their transitive dependencies. > Make dist flatten rocksdbjni > > > Key: KAFKA-10380 > URL: https://issues.apache.org/jira/browse/KAFKA-10380 > Project: Kafka > Issue Type: Task > Components: build >Affects Versions: 2.6.0 >Reporter: Adrian Cole >Priority: Major > > I was looking for ways to reduce the size of our Kafka image, and the most > notable opportunity is handling rocksdbjni differently. It is currently a > 15MB jar. > As mentioned in its description rocksdbjni includes binaries for a lot of OS > choices. > du -k librocksdbjni-* > 7220 librocksdbjni-linux-aarch64.so > 8756 librocksdbjni-linux-ppc64le.so > 7220 librocksdbjni-linux32.so > 7932 librocksdbjni-linux64.so > 5440 librocksdbjni-osx.jnilib > 4616 librocksdbjni-win64.dll > It may not seem obvious in normal dists, which aim to work for many operating > systems what is a problem here. When creating docker images, we currently > would need to repackage this to scrub out the irrelevant OS items or accept > files larger than alpine itself. > While this might be something to kick back to rocksdb. having some options > here would be great. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177973#comment-17177973 ] Ewen Cheslack-Postava commented on KAFKA-5896: -- [~yazgoo] It looks like this was marked "Abandoned". Reiterating my previous comment: > Really my complaint is that Java's interrupt semantics are terrible and try >to give the feeling of preemption when it can't and therefore leaves a ton of >bugs and overpromised underdelivered guarantees in its wake. It's fine to reopen if you think it needs more discussion, I just don't see a way to actually fix the issue – Thread.interrupt doesn't do what we'd want and afaik the jvm doesn't provide anything that does. So I think given those constraints, it's probably better to identify the connector that is behaving badly and work with upstream to address it. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9774) Create official Docker image for Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-9774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070633#comment-17070633 ] Ewen Cheslack-Postava commented on KAFKA-9774: -- My first reaction to this was that is should have a KIP because it's a significant public support, compatibility, and maintenance commitment for the project... Maybe if it's very clearly highlighted as unofficial, no compatibility guarantees, etc, though I'm not sure how many people want that type of image being published from the project. As simple off-the-top-of-my-head examples, what's the commitment to publication during releases (is this being added to [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]?), periodic updates to get base image CVE updates, verification, testing, etc? > Create official Docker image for Kafka Connect > -- > > Key: KAFKA-9774 > URL: https://issues.apache.org/jira/browse/KAFKA-9774 > Project: Kafka > Issue Type: Task > Components: build, KafkaConnect, packaging >Affects Versions: 2.4.1 >Reporter: Jordan Moore >Priority: Major > Labels: build, features > Attachments: image-2020-03-27-05-04-46-792.png, > image-2020-03-27-05-05-59-024.png > > > This is a ticket for creating an *official* apache/kafka-connect Docker > image. > Does this need a KIP? - I don't think so. This would be a new feature, not > any API change. > Why is this needed? > # Kafka Connect is stateless. I believe this is why a Kafka image is not > created? > # It scales much more easily with Docker and orchestrators. It operates much > like any other serverless / "microservice" web application > # People struggle with deploying it because it is packaged _with Kafka_ , > which leads some to believe it needs to _*run* with Kafka_ on the same > machine. > I think there is separate ticket for creating an official Docker image for > Kafka but clearly none exist. I reached out to Confluent about this, but > heard nothing yet. > !image-2020-03-27-05-05-59-024.png|width=740,height=196! > > Zookeeper already has one , btw > !image-2020-03-27-05-04-46-792.png|width=739,height=288! > *References*: > [Docs for Official > Images|[https://docs.docker.com/docker-hub/official_images/]] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9468) config.storage.topic partition count issue is hard to debug
[ https://issues.apache.org/jira/browse/KAFKA-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022513#comment-17022513 ] Ewen Cheslack-Postava commented on KAFKA-9468: -- [~rhauch] Allowing things to run in that environment leads to incorrect (and likely unrecoverable, without a ton of manual effort) state. I'd consider it a bug, which doesn't need a KIP. The other consideration is potential fallout. It seems hard to believe someone would be running for very long without running into problems caused by having more than one partition given at least on restart/worker addition, you bulk load that data and could easily end up with misordered data across multiple reconfigurations for the same connector. > config.storage.topic partition count issue is hard to debug > --- > > Key: KAFKA-9468 > URL: https://issues.apache.org/jira/browse/KAFKA-9468 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.2, 1.1.1, 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: Evelyn Bayes >Priority: Minor > > When you run connect distributed with 2 or more workers and > config.storage.topic has more then 1 partition, you can end up with one of > the workers rebalancing endlessly: > [2020-01-13 12:53:23,535] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Current config state offset 37 is behind group > assignment 63, reading to end of config log > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2020-01-13 12:53:23,584] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Finished reading to end of log and updated config > snapshot, new config log offset: 37 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2020-01-13 12:53:23,584] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Current config state offset 37 does not match group > assignment 63. Forcing rebalance. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > In case any person viewing this doesn't know you are only ever meant to > create this topic with one partition. > > *Suggested Solution* > Make the connect worker check the partition count when it starts and if > partition count is > 1 Kafka Connect stops and logs the reason why. > I think this is reasonable as it would stop users just starting out from > building it incorrectly and would be easy to fix early. For those upgrading > this would easily be caught in a PRE-PROD environment. And even if they > upgraded directly in PROD you would only be impacted if upgraded all connect > workers at the same time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-5635) KIP-181 Kafka-Connect integrate with kafka ReST Proxy
[ https://issues.apache.org/jira/browse/KAFKA-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava closed KAFKA-5635. > KIP-181 Kafka-Connect integrate with kafka ReST Proxy > - > > Key: KAFKA-5635 > URL: https://issues.apache.org/jira/browse/KAFKA-5635 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Dhananjay Patkar >Priority: Major > Labels: features, newbie > > Kafka connect currently uses kafka clients which directly connect to kafka > brokers. > In a use case, wherein I have many kafka connect [producers] running remotely > its a challenge to configure broker information on every connect agent. > Also, in case of IP change [upgrade or cluster re-creation], we need to > update every remote connect configuration. > If kafka connect source connectors talk to ReST endpoint then client is > unaware of broker details. This way we can transparently upgrade / re-create > kafka cluster as long as ReST endpoint remains same. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8586) Source task producers silently fail to send records
[ https://issues.apache.org/jira/browse/KAFKA-8586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16870869#comment-16870869 ] Ewen Cheslack-Postava commented on KAFKA-8586: -- `recordSent` call could probably be moved into the `else` block that validates there wasn't an exception. This goes way back to Connect v1 – the first AuthenticationException that arguably violates the semantics of Retriable vs non-Retriable exceptions appeared after that code was written, but same release iirc. However, let's not overestimate the impact here, but also account for changes in 2.3.0. Until 2.3.0, Connect was basically using a single producer config for all connectors (via `producer.` overrides). For this issue in particular, it *could* be using different principals for offset commits and task producers (since iirc only the task producers take the overrides), or the topics could have different ACLs. It's likely the most common case is not differentiating, and thus lack of any reports of this. (I'd have to review what e2e coverage we have – we have tests to generally discover loss like this, but in the context specifically of different principals or ACLs, I wouldn't be terribly surprised if we didn't have coverage today). Now, thinking about impact moving forward, 2.3.0 will have [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]. This is more interesting because combined with [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations], Connect is in a much more reasonable position for users to truly go multi-tenant, have their own secrets, use a bunch of different principals, and make much more extensive use of ACLs. I'm not sure it makes it *that* much more likely to hit this issue (folks that weren't satisfied with state of security for multi-tenant use would have used multiple clusters but still potentially used different principals between internal and source task output topics), but it might make the pattern more common. We should also talk about fallout and workarounds. There are 2 major failure modes that I can see. The first is probably the most common: when you are trying to configure a connector for the first time, trying to iterate to a good config, and hit auth failures. In this case, the follow up would be to clear out all offset data since nothing useful would have been produced anyway. The second case is if somebody mucks with ACLs after a connector is successfully configured. In this case, you'd need an offset *reset*, rather than deletion. I suspect case 1 is far more common and a bigger issue than case 2, and while source offset tooling isn't good/existent, at least it is de facto and in the future default standardized [https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig] in a way that makes it possible to work through this problem (at least for case 1). In terms of solutions, we can move `recordSent` such that the offset commit, but we'd also still need to do something like aggressively propagate the exception to turn it into a task failure if we want to respect those failure modes. Users would then just have to iterate on restarting all the tasks. Interestingly, we do config validation, but I don't think it validates any ability to connect or do anything against the brokers. It would be interesting to pre-flight check some of this, but don't think its a full solution because, e.g., a source connector might just start producing to a new topic it doesn't have permissions to. Additionally, permissions might change on the fly, so even a pre-flight against known topics might not fully cover us. > Source task producers silently fail to send records > --- > > Key: KAFKA-8586 > URL: https://issues.apache.org/jira/browse/KAFKA-8586 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > The Connect framework marks source records as successfully sent when they are > dispatched to the producer, instead of when they are actually sent to Kafka. > [This is assumed to be good > enough|https://github.com/apache/kafka/blob/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L324-L331] > since the Connect framework sets up its producer to use infinite retries on > retriable errors, but in the case of an authorization or authentication > failure with a secured Kafka broker, the errors aren't retriable and cause > the producer to invoke its send callback with an exception and the
[jira] [Closed] (KAFKA-4048) Connect does not support RetriableException consistently for sinks
[ https://issues.apache.org/jira/browse/KAFKA-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava closed KAFKA-4048. > Connect does not support RetriableException consistently for sinks > -- > > Key: KAFKA-4048 > URL: https://issues.apache.org/jira/browse/KAFKA-4048 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan >Priority: Major > > We only allow for handling {{RetriableException}} from calls to > {{SinkTask.put()}}, but this is something we should support also for > {{flush()}} and arguably also {{open()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8423) Update ducktape to not use deprecated APIs
[ https://issues.apache.org/jira/browse/KAFKA-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849908#comment-16849908 ] Ewen Cheslack-Postava commented on KAFKA-8423: -- [~htewari] The warning is actually coming from the paramiko library, so in a sense we're two levels removed from the actual source of the problem. This is, indeed an issue in ducktape in that the dependency on paramiko would need to be updated there, but checking the most recent released version of paramiko atm (2.4.2), [https://github.com/paramiko/paramiko/blob/2.4.2/paramiko/kex_ecdh_nist.py#L39] indicates paramiko is still using the same methods and hasn't updated. To fix this we might need to contribute patch upstream to paramiko, wait for them to release, then update the paramiko dependency in ducktape, release ducktape, and finally update the ducktape dependency here in Kafka. > Update ducktape to not use deprecated APIs > -- > > Key: KAFKA-8423 > URL: https://issues.apache.org/jira/browse/KAFKA-8423 > Project: Kafka > Issue Type: Improvement > Components: system tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Hardik Tewari >Priority: Major > > Running system tests locally, I see the following warnings: > {code:java} > /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: > CryptographyDeprecationWarning: encode_point has been deprecated on > EllipticCurvePublicNumbers and will be removed in a future version. Please > use EllipticCurvePublicKey.public_bytes to obtain both compressed and > uncompressed point encoding. > m.add_string(self.Q_C.public_numbers().encode_point()) > /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: > CryptographyDeprecationWarning: Support for unsafe construction of public > numbers from encoded data will be removed in a future version. Please use > EllipticCurvePublicKey.from_encoded_point > self.curve, Q_S_bytes > /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: > CryptographyDeprecationWarning: encode_point has been deprecated on > EllipticCurvePublicNumbers and will be removed in a future version. Please > use EllipticCurvePublicKey.public_bytes to obtain both compressed and > uncompressed point encoding. > hm.add_string(self.Q_C.public_numbers().encode_point()) > {code} > We should update the code to not use deprecated APIs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16794652#comment-16794652 ] Ewen Cheslack-Postava commented on KAFKA-2480: -- [~olkuznsmith] I see, you probably got confused because the PR for KAFKA-2481 accidentally pointed here. The difficulty is that both are semantics might be desired. The intent of timeout as implemented is to handle *buffering* connectors. In that case, they may *accept* data without necessarily committing it synchronously. If they encounter a failure that keeps them from even getting the data sent on the network (e.g. downstream system has availability issue), they want to express to the framework that they still have work to do, but are having some problem accomplishing it, so need to back off; but if *no more data* shows up, they don't want to wait indefinitely – they want to express the *maximum* amount of time the framework should wait before passing control back to the task to retry whatever operation was failing, even if there isn't new data available. But if new data becomes available, the connector may want to accept it immediately. It may be destined for some location that doesn't have the same issue, or can be buffered, etc. For example, this is how the HDFS connector uses this timeout functionality. On the other hand, a connector that, e.g., deals with a rate-limited API may know exactly how long it needs to wait before it's worth passing control back *at all* (or any other case where you know the issue won't be resolved until *at least* some amount of time has passed). This has come up and been discussed as a possible improvement to `RetriableException` (since you should be throwing that if you can't even buffer the data that's being included in the `put()` call). I don't think there's a Jira (at least I'm not finding one), but it was probably discussed on the mailing list. There's also KAFKA-3819 on the source side, which is another variant of "time management" convenience utilities. > Handle non-CopycatExceptions from SinkTasks > --- > > Key: KAFKA-2480 > URL: https://issues.apache.org/jira/browse/KAFKA-2480 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.0 > > > Currently we catch Throwable in WorkerSinkTask, but we just log the > exception. This can lead to data loss because it indicates the messages in > the {{put(records)}} call probably were not handled properly. We need to > decide what the policy for handling these types of exceptions should be -- > try repeating the same records again, risking duplication? or skip them, > risking loss? or kill the task immediately and require intervention since > it's unclear what happened? > SourceTasks don't have the same concern -- they can throw other exceptions > and as long as we catch them, it is up to the connector to ensure that it > does not lose data as a result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7813) JmxTool throws NPE when --object-name is omitted
[ https://issues.apache.org/jira/browse/KAFKA-7813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7813. -- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 6139 [https://github.com/apache/kafka/pull/6139] > JmxTool throws NPE when --object-name is omitted > > > Key: KAFKA-7813 > URL: https://issues.apache.org/jira/browse/KAFKA-7813 > Project: Kafka > Issue Type: Bug >Reporter: Attila Sasvari >Assignee: huxihx >Priority: Minor > Fix For: 2.3.0 > > > Running the JMX tool without --object-name parameter, results in a > NullPointerException: > {code} > $ bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url > service:jmx:rmi:///jndi/rmi://127.0.0.1:/jmxrmi > ... > Exception in thread "main" java.lang.NullPointerException > at kafka.tools.JmxTool$$anonfun$3.apply(JmxTool.scala:143) > at kafka.tools.JmxTool$$anonfun$3.apply(JmxTool.scala:143) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at kafka.tools.JmxTool$.main(JmxTool.scala:143) > at kafka.tools.JmxTool.main(JmxTool.scala) > {code} > Documentation of the tool says: > {code} > --object-name A JMX object name to use as a query. > >This can contain wild cards, and > >this option can be given multiple > >times to specify more than one > >query. If no objects are specified > >all objects will be queried. > {code} > Running the tool with {{--object-name ''}}, also results in an NPE: > {code} > $ bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url > service:jmx:rmi:///jndi/rmi://127.0.0.1:/jmxrmi --object-name '' > ... > Exception in thread "main" java.lang.NullPointerException > at kafka.tools.JmxTool$.main(JmxTool.scala:197) > at kafka.tools.JmxTool.main(JmxTool.scala) > {code} > Runnig the tool with --object-name without an argument, the tool with > OptionMissingRequiredArgumentException: > {code} > $ bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url > service:jmx:rmi:///jndi/rmi://127.0.0.1:/jmxrmi --object-name > Exception in thread "main" joptsimple.OptionMissingRequiredArgumentException: > Option object-name requires an argument > at > joptsimple.RequiredArgumentOptionSpec.detectOptionArgument(RequiredArgumentOptionSpec.java:48) > at > joptsimple.ArgumentAcceptingOptionSpec.handleOption(ArgumentAcceptingOptionSpec.java:257) > at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:513) > at > joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) > at joptsimple.OptionParser.parse(OptionParser.java:396) > at kafka.tools.JmxTool$.main(JmxTool.scala:104) > at kafka.tools.JmxTool.main(JmxTool.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7197) Release a milestone build for Scala 2.13.0 M3
[ https://issues.apache.org/jira/browse/KAFKA-7197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-7197: Assignee: Dejan Stojadinović > Release a milestone build for Scala 2.13.0 M3 > - > > Key: KAFKA-7197 > URL: https://issues.apache.org/jira/browse/KAFKA-7197 > Project: Kafka > Issue Type: Improvement >Reporter: Martynas Mickevičius >Assignee: Dejan Stojadinović >Priority: Minor > > Releasing a milestone version for Scala 2.13.0-M3 (and maybe even for > 2.13.0-M4, which has new collections) would be helpful to kickstart Kafka > ecosystem adoption for 2.13.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16794612#comment-16794612 ] Ewen Cheslack-Postava commented on KAFKA-2480: -- [~olkuznsmith] This doesn't seem to have anything to do with this JIRA, but the behavior is intended. If you want the behavior you're describing, it's easy to follow up after poll returns by sleeping as long as you need to in order to block until you hit your business-logic-timeout. Or, if you want to continue fetching, just wrap the poll calls in a loop and accumulate the results (but be sure to have some cap, or else you're bound to OOM). Vast majority of users would not want this as it just blocks process and limits throughput, potentially to the point that you can't ever catch up. The interface is designed to be low-level enough to implement any of these patterns, and it is a common pattern (e.g., going way back to `select`). > Handle non-CopycatExceptions from SinkTasks > --- > > Key: KAFKA-2480 > URL: https://issues.apache.org/jira/browse/KAFKA-2480 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.0 > > > Currently we catch Throwable in WorkerSinkTask, but we just log the > exception. This can lead to data loss because it indicates the messages in > the {{put(records)}} call probably were not handled properly. We need to > decide what the policy for handling these types of exceptions should be -- > try repeating the same records again, risking duplication? or skip them, > risking loss? or kill the task immediately and require intervention since > it's unclear what happened? > SourceTasks don't have the same concern -- they can throw other exceptions > and as long as we catch them, it is up to the connector to ensure that it > does not lose data as a result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7834) Extend collected logs in system test services to include heap dumps
[ https://issues.apache.org/jira/browse/KAFKA-7834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-7834: - Fix Version/s: 2.1.2 > Extend collected logs in system test services to include heap dumps > --- > > Key: KAFKA-7834 > URL: https://issues.apache.org/jira/browse/KAFKA-7834 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 2.2.0, 2.0.2, 2.3.0, 2.1.2 > > > Overall I'd suggest enabling by default: > {\{-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="}} > in the major system test services, so that a heap dump is captured on OOM. > Given these flags, we should also extend the set of collected logs in each > service to include the predetermined filename for the heap dump. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7834) Extend collected logs in system test services to include heap dumps
[ https://issues.apache.org/jira/browse/KAFKA-7834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7834. -- Resolution: Fixed Fix Version/s: (was: 1.1.2) (was: 3.0.0) (was: 1.0.3) 2.3.0 Issue resolved by pull request 6158 [https://github.com/apache/kafka/pull/6158] > Extend collected logs in system test services to include heap dumps > --- > > Key: KAFKA-7834 > URL: https://issues.apache.org/jira/browse/KAFKA-7834 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 2.3.0, 2.2.0, 2.0.2 > > > Overall I'd suggest enabling by default: > {\{-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="}} > in the major system test services, so that a heap dump is captured on OOM. > Given these flags, we should also extend the set of collected logs in each > service to include the predetermined filename for the heap dump. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5117) Kafka Connect REST endpoints reveal Password typed values
[ https://issues.apache.org/jira/browse/KAFKA-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-5117: - Fix Version/s: 2.0.2 2.1.1 2.2.0 > Kafka Connect REST endpoints reveal Password typed values > - > > Key: KAFKA-5117 > URL: https://issues.apache.org/jira/browse/KAFKA-5117 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Thomas Holmes >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > Fix For: 2.2.0, 2.1.1, 2.0.2 > > > A Kafka Connect connector can specify ConfigDef keys as type of Password. > This type was added to prevent logging the values (instead "[hidden]" is > logged). > This change does not apply to the values returned by executing a GET on > {{connectors/\{connector-name\}}} and > {{connectors/\{connector-name\}/config}}. This creates an easily accessible > way for an attacker who has infiltrated your network to gain access to > potential secrets that should not be available. > I have started on a code change that addresses this issue by parsing the > config values through the ConfigDef for the connector and returning their > output instead (which leads to the masking of Password typed configs as > [hidden]). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7461) Connect Values converter should have coverage of logical types
[ https://issues.apache.org/jira/browse/KAFKA-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7461. -- Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 6077 [https://github.com/apache/kafka/pull/6077] > Connect Values converter should have coverage of logical types > -- > > Key: KAFKA-7461 > URL: https://issues.apache.org/jira/browse/KAFKA-7461 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Andrew Schofield >Priority: Blocker > Labels: newbie, test > Fix For: 2.2.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > Per fix from KAFKA-7460, we've got some gaps in testing for the Values > converter added in KIP-145, in particular for logical types. It looks like > there are a few other gaps (e.g. from quick scan of coverage, maybe the float > types as well), but logical types seem to be the bulk other than trivial > wrapper methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7503) Integration Test Framework for Connect
[ https://issues.apache.org/jira/browse/KAFKA-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7503. -- Resolution: Fixed Fix Version/s: 2.0.2 2.1.1 2.2.0 Issue resolved by pull request 5516 [https://github.com/apache/kafka/pull/5516] > Integration Test Framework for Connect > -- > > Key: KAFKA-7503 > URL: https://issues.apache.org/jira/browse/KAFKA-7503 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Arjun Satish >Priority: Minor > Fix For: 2.2.0, 2.1.1, 2.0.2 > > > Implement a framework which enables writing and executing integration tests > against real connect workers and kafka brokers. The worker and brokers would > run within the same process the test is running (which is similar to how > integration tests are written in Streams and Core). The complexity of these > tests would lie somewhere between unit tests and system tests. The main > utility is to be able to run end-to-end tests within the java test framework, > and facilitate development of large features which could modify many parts of > the framework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7551) Refactor to create both producer & consumer in Worker
[ https://issues.apache.org/jira/browse/KAFKA-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7551. -- Resolution: Fixed Merged [https://github.com/apache/kafka/pull/5842,] my bad for screwing up closing the Jira along with the fix... > Refactor to create both producer & consumer in Worker > - > > Key: KAFKA-7551 > URL: https://issues.apache.org/jira/browse/KAFKA-7551 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Minor > Fix For: 2.2.0 > > > In distributed mode, the producer is created in the Worker and the consumer > is created in the WorkerSinkTask. The proposal is to refactor it so that both > of them are created in Worker. This will not affect any functionality and is > just a refactoring to make the code consistent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null
[ https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7620. -- Resolution: Fixed Fix Version/s: 2.0.2 2.1.1 2.2.0 Issue resolved by pull request 5914 [https://github.com/apache/kafka/pull/5914] > ConfigProvider is broken for KafkaConnect when TTL is not null > -- > > Key: KAFKA-7620 > URL: https://issues.apache.org/jira/browse/KAFKA-7620 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1 >Reporter: Ye Ji >Assignee: Robert Yokota >Priority: Major > Fix For: 2.2.0, 2.1.1, 2.0.2 > > > If the ConfigData returned by ConfigProvider.get implementations has non-null > and non-negative ttl, it will trigger infinite recursion, here is an excerpt > of the stack trace: > {code:java} > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) > at > org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) > {code} > Basically, > 1) if a non-null ttl is returned from the config provider, connect runtime > will try to schedule a reload in the future, > 2) scheduleReload function reads the config again to see if it is a restart > or not, by calling > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to > transform the config > 3) the transform function calls config provider, and gets a non-null ttl, > causing scheduleReload being called, we are back to step 1. > To reproduce, simply fork the provided > [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java], > and add a non-negative ttl to the ConfigData returned by the get functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7560. -- Resolution: Fixed Fix Version/s: 2.1.0 2.2.0 Issue resolved by pull request 5886 [https://github.com/apache/kafka/pull/5886] > PushHttpMetricsReporter should not convert metric value to double > - > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.2.0, 2.1.0 > > > Currently PushHttpMetricsReporter will convert value from > KafkaMetric.metricValue() to double. This will not work for non-numerical > metrics such as version in AppInfoParser whose value can be string. This has > caused issue for PushHttpMetricsReporter which in turn caused system test > kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the > following exception: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > Since we allow metric value to be object, PushHttpMetricsReporter should also > read metric value as object and pass it to the http server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663144#comment-16663144 ] Ewen Cheslack-Postava commented on KAFKA-7481: -- [~lindong] I think anything here ends up having at least *some* blocker component for 2.1 because we're either a) changing the previously understood (even if not promised) semantics for a setting or b) need to at least do some docs/upgrade notes clarifications. I think we may also need to consider both short and long term solutions. Anything relying on KIP-35 to check inter-broker protocol compatibility seems like not a good idea for 2.1 this late in the release cycle. (I mentioned for completeness & getting to a good long term solution even if we have a short-term hack, but I don't think that's a practical change to get into 2.1 at this point.) Also, even if we switch to using KIP-35, there's a whole compatibility story wrt existing settings that would need to be worked out. re: your specific proposal, it generally sounds reasonable but wrt testing surface area, it actually does increase it to some degree because we would need tests that validate correct behavior via KIP-35 checks while upgrading. Longer term, I definitely think making things "just work" rather than having the user manage protocol versions manually during upgrade is better. Just not sure we can actually make that happen immediately for this release. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662460#comment-16662460 ] Ewen Cheslack-Postava commented on KAFKA-7481: -- [~lindong] we might just want to update the script with a flag to override any remaining blockers (and maybe even just list them in the notes for the RC). Ideally we don't hit this type of situation regularly, but when we do, might be better to be able to override on the release mgr side rather than possibly losing stuff by adjusting priorities on the JIRA. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Critical > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659935#comment-16659935 ] Ewen Cheslack-Postava commented on KAFKA-7481: -- [~ijuma] As it stands today, is the only use case for inter.broker.protocol.version to hold back the version during rolling upgrade? Do we even really need this with KIP-35? It sounds like aside from the two rolling bounce upgrade, you expect people wouldn't generally be using this today (given you say the benefit is minimal)? I'm fine overloading one of them if we think the existing use case for it is either ok to compromise with additional restrictions OR tradeoffs aren't weird like they are for log.message.format.version. Test surface area is a fair point, wrt complexity I guess I don't see that much of a difference for users since they just copy/paste 2 lines rather than 1 and otherwise probably blindly follow the directions. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658580#comment-16658580 ] Ewen Cheslack-Postava commented on KAFKA-7481: -- [~ijuma] Arguably your suggestion isn't something we should do in a minor version since it is a breaking semantic change, though I don't know whether we've really committed to the reversibility of inter broker protocol version. The upgrade notes already don't really point out that you can roll that back. In fact, reading them now I'm not sure how many people even realize this is possible – we never really explain anything about *why* the upgrade process is the way it is. At a bare minimum, if we're aware of any cases where people have rolled back without reverting the broker version entirely, I would be wary of changing the semantics like this. I think the tension between the two existing configs is that inter.broker.protocol.version *today* is basically all the stateless protocol stuff, which makes it safe to upgrade/downgrade, whereas log.message.format.version is about persisted format, but for client-facing data. The consumer state (and other examples from the KIP) are internal but persistent, which makes neither a great fit. re: being too complicated to explain, it seems like today we basically just rely on telling them the mechanics and only for log.message.format.version do we go into a bit more detail, and only because of the perf impact. If we just documented what to do with the new config and it just fits in alongside one of the other two for the common case, is it really that much more complicated? We should probably also be clear about upgrade/downgrade scenarios. For example, the rejected alternatives state: {quote} # We considered overloading `inter.broker.protocol.version` to include changes to the persistent metadata format. The main drawback is that users cannot test features which depend on the inter-broker protocol behavior without committing to the upgrade. For example, KIP-320 makes changes to replication protocol which may require additional testing. {quote} It took me a while to realize that "without committing to the upgrade" specifically means the *broker version upgrade*, not the *upgrade to the metadata format*. Even with a new config, you could roll back further use of that format, but brokers could still process the data (as mentioned elsewhere in the KIP wrt an in-progress rolling update to turn on the new format) as long as it is compatible (and it seems likely we'd always have a compatibility path since we could always have clusters with both formats). I'm hesitant to add more configs since we already have tons, but I also wouldn't want to conflate two things in one config unless we're really convinced they belong together. I'd rather have the configs and then hide them behind some aggregate config that overrides them and handles the 99% use case as cleanly as possible than end up with fewer configs but which are harder to reason about. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6490. -- Resolution: Fixed Fix Version/s: 2.0.0 Closing as this is effectively fixed by [https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect] which allows you to configure how errors are handled, and should apply to errors in Converters, Transformations, and Connectors. > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Assignee: Prasanna Subburaj >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655830#comment-16655830 ] Ewen Cheslack-Postava commented on KAFKA-7510: -- [~mjsax] We should be consistent. Do not log the data. If it is acceptable at any level, it would only be acceptable at TRACE. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5117) Kafka Connect REST endpoints reveal Password typed values
[ https://issues.apache.org/jira/browse/KAFKA-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5117. -- Resolution: Duplicate Assignee: Ewen Cheslack-Postava Fix Version/s: 2.0.0 > Kafka Connect REST endpoints reveal Password typed values > - > > Key: KAFKA-5117 > URL: https://issues.apache.org/jira/browse/KAFKA-5117 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Thomas Holmes >Assignee: Ewen Cheslack-Postava >Priority: Major > Labels: needs-kip > Fix For: 2.0.0 > > > A Kafka Connect connector can specify ConfigDef keys as type of Password. > This type was added to prevent logging the values (instead "[hidden]" is > logged). > This change does not apply to the values returned by executing a GET on > {{connectors/\{connector-name\}}} and > {{connectors/\{connector-name\}/config}}. This creates an easily accessible > way for an attacker who has infiltrated your network to gain access to > potential secrets that should not be available. > I have started on a code change that addresses this issue by parsing the > config values through the ConfigDef for the connector and returning their > output instead (which leads to the masking of Password typed configs as > [hidden]). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5117) Kafka Connect REST endpoints reveal Password typed values
[ https://issues.apache.org/jira/browse/KAFKA-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640116#comment-16640116 ] Ewen Cheslack-Postava commented on KAFKA-5117: -- Going to close this since [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations] addresses this problem. Feel free to reopen if that doesn't sufficiently address the issue. > Kafka Connect REST endpoints reveal Password typed values > - > > Key: KAFKA-5117 > URL: https://issues.apache.org/jira/browse/KAFKA-5117 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Thomas Holmes >Priority: Major > Labels: needs-kip > > A Kafka Connect connector can specify ConfigDef keys as type of Password. > This type was added to prevent logging the values (instead "[hidden]" is > logged). > This change does not apply to the values returned by executing a GET on > {{connectors/\{connector-name\}}} and > {{connectors/\{connector-name\}/config}}. This creates an easily accessible > way for an attacker who has infiltrated your network to gain access to > potential secrets that should not be available. > I have started on a code change that addresses this issue by parsing the > config values through the ConfigDef for the connector and returning their > output instead (which leads to the masking of Password typed configs as > [hidden]). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7461) Connect Values converter should have coverage of logical types
[ https://issues.apache.org/jira/browse/KAFKA-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640101#comment-16640101 ] Ewen Cheslack-Postava commented on KAFKA-7461: -- [~ritabrata1808] I didn't look carefully at which types were missing coverage, but yeah, that looks like the right general direction. > Connect Values converter should have coverage of logical types > -- > > Key: KAFKA-7461 > URL: https://issues.apache.org/jira/browse/KAFKA-7461 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Blocker > Labels: newbie, test > Original Estimate: 24h > Remaining Estimate: 24h > > Per fix from KAFKA-7460, we've got some gaps in testing for the Values > converter added in KIP-145, in particular for logical types. It looks like > there are a few other gaps (e.g. from quick scan of coverage, maybe the float > types as well), but logical types seem to be the bulk other than trivial > wrapper methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7476) SchemaProjector is not properly handling Date-based logical types
[ https://issues.apache.org/jira/browse/KAFKA-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7476. -- Resolution: Fixed Fix Version/s: 2.1.0 0.10.2.3 2.0.1 0.9.0.2 1.0.3 0.11.0.4 0.10.1.2 0.10.0.2 2.2.0 1.1.2 Issue resolved by pull request 5736 [https://github.com/apache/kafka/pull/5736] > SchemaProjector is not properly handling Date-based logical types > - > > Key: KAFKA-7476 > URL: https://issues.apache.org/jira/browse/KAFKA-7476 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Robert Yokota >Assignee: Robert Yokota >Priority: Major > Fix For: 1.1.2, 2.2.0, 0.10.0.2, 0.10.1.2, 0.11.0.4, 1.0.3, > 0.9.0.2, 2.0.1, 0.10.2.3, 2.1.0 > > > SchemaProjector is not properly handling Date-based logical types. An > exception of the following form is thrown: > {{Caused by: java.lang.ClassCastException: java.util.Date cannot be cast to > java.lang.Number}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7460) Connect Values converter uses incorrect date format string
[ https://issues.apache.org/jira/browse/KAFKA-7460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7460. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 1.1.2 Issue resolved by pull request 5718 [https://github.com/apache/kafka/pull/5718] > Connect Values converter uses incorrect date format string > -- > > Key: KAFKA-7460 > URL: https://issues.apache.org/jira/browse/KAFKA-7460 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0, 1.1.1, 2.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Blocker > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Discovered in KAFKA-6684, the converter is using week date year () > instead of plain year () and day in year (DD) instead of date in month > (dd). > Filing this so we have independent tracking of the issue since it was only > tangentially related and discovered in that issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7461) Connect Values converter should have coverage of logical types
Ewen Cheslack-Postava created KAFKA-7461: Summary: Connect Values converter should have coverage of logical types Key: KAFKA-7461 URL: https://issues.apache.org/jira/browse/KAFKA-7461 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.0.0, 1.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Per fix from KAFKA-7460, we've got some gaps in testing for the Values converter added in KIP-145, in particular for logical types. It looks like there are a few other gaps (e.g. from quick scan of coverage, maybe the float types as well), but logical types seem to be the bulk other than trivial wrapper methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7460) Connect Values converter uses incorrect date format string
Ewen Cheslack-Postava created KAFKA-7460: Summary: Connect Values converter uses incorrect date format string Key: KAFKA-7460 URL: https://issues.apache.org/jira/browse/KAFKA-7460 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0, 1.1.1, 1.1.0 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Discovered in KAFKA-6684, the converter is using week date year () instead of plain year () and day in year (DD) instead of date in month (dd). Filing this so we have independent tracking of the issue since it was only tangentially related and discovered in that issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE
[ https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7434. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 Issue resolved by pull request 5700 [https://github.com/apache/kafka/pull/5700] > DeadLetterQueueReporter throws NPE if transform throws NPE > -- > > Key: KAFKA-7434 > URL: https://issues.apache.org/jira/browse/KAFKA-7434 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 > Environment: jdk 8 >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > A NPE thrown from a transform in a connector configured with > errors.deadletterqueue.context.headers.enable=true > causes DeadLetterQueueReporter to break with a NPE. > {code} > Executing stage 'TRANSFORMATION' with class > 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is > {topic='', partition=1, offset=0, timestamp=1537370573366, > timestampType=CreateTime}. > (org.apache.kafka.connect.runtime.errors.LogReporter) > java.lang.NullPointerException > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > This is caused by populateContextHeaders only checking if the Throwable is > not null, but not checking that the message in the Throwable is not null > before trying to serialize the message: > [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177] > {code:java} > if (context.error() != null) { > headers.add(ERROR_HEADER_EXCEPTION, > toBytes(context.error().getClass().getName())); > headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, > toBytes(context.error().getMessage())); > {code} > toBytes throws an NPE if passed null as the parameter. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608631#comment-16608631 ] Ewen Cheslack-Postava commented on KAFKA-6684: -- [~amitsela] fyi, "not ready yet" was just from previous release manager pointing out that the PR wasn't quite ready to be merged. it wasn't a quality comment, just a comment that it wasn't getting merged before the next release. i followed up on the most recently updated PR and this looks good to be merged soon! > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Critical > Fix For: 2.1.0 > > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4932) Add UUID Serde
[ https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608605#comment-16608605 ] Ewen Cheslack-Postava commented on KAFKA-4932: -- [~brandon.kirchner] We included a few config parameters in the serdes that I think weren't in the original KIP. none are controversial, but perhaps we could update the KIP to include them, just for documentation purposes? > Add UUID Serde > -- > > Key: KAFKA-4932 > URL: https://issues.apache.org/jira/browse/KAFKA-4932 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Brandon Kirchner >Priority: Minor > Labels: needs-kip, newbie > Fix For: 2.1.0 > > > I propose adding serializers and deserializers for the java.util.UUID class. > I have many use cases where I want to set the key of a Kafka message to be a > UUID. Currently, I need to turn UUIDs into strings or byte arrays and use > their associated Serdes, but it would be more convenient to serialize and > deserialize UUIDs directly. > I'd propose that the serializer and deserializer use the 36-byte string > representation, calling UUID.toString and UUID.fromString, and then using the > existing StringSerializer / StringDeserializer to finish the job. We would > also wrap these in a Serde and modify the streams Serdes class to include > this in the list of supported types. > Optionally, we could have the deserializer support a 16-byte representation > and it would check the size of the input byte array to determine whether it's > a binary or string representation of the UUID. It's not well defined whether > the most significant bits or least significant go first, so this deserializer > would have to support only one or the other. > Similary, if the deserializer supported a 16-byte representation, there could > be two variants of the serializer, a UUIDStringSerializer and a > UUIDBytesSerializer. > I would be willing to write this PR, but am looking for feedback about > whether there are significant concerns here around ambiguity of what the byte > representation of a UUID should be, or if there's desire to keep to list of > built-in Serdes minimal such that a PR would be unlikely to be accepted. > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4932) Add UUID Serde
[ https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-4932. -- Resolution: Fixed Issue resolved by pull request 4438 [https://github.com/apache/kafka/pull/4438] > Add UUID Serde > -- > > Key: KAFKA-4932 > URL: https://issues.apache.org/jira/browse/KAFKA-4932 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Brandon Kirchner >Priority: Minor > Labels: needs-kip, newbie > Fix For: 2.1.0 > > > I propose adding serializers and deserializers for the java.util.UUID class. > I have many use cases where I want to set the key of a Kafka message to be a > UUID. Currently, I need to turn UUIDs into strings or byte arrays and use > their associated Serdes, but it would be more convenient to serialize and > deserialize UUIDs directly. > I'd propose that the serializer and deserializer use the 36-byte string > representation, calling UUID.toString and UUID.fromString, and then using the > existing StringSerializer / StringDeserializer to finish the job. We would > also wrap these in a Serde and modify the streams Serdes class to include > this in the list of supported types. > Optionally, we could have the deserializer support a 16-byte representation > and it would check the size of the input byte array to determine whether it's > a binary or string representation of the UUID. It's not well defined whether > the most significant bits or least significant go first, so this deserializer > would have to support only one or the other. > Similary, if the deserializer supported a 16-byte representation, there could > be two variants of the serializer, a UUIDStringSerializer and a > UUIDBytesSerializer. > I would be willing to write this PR, but am looking for feedback about > whether there are significant concerns here around ambiguity of what the byte > representation of a UUID should be, or if there's desire to keep to list of > built-in Serdes minimal such that a PR would be unlikely to be accepted. > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7353) Connect logs 'this' for anonymous inner classes
[ https://issues.apache.org/jira/browse/KAFKA-7353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7353. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 1.0.3 1.1.2 Issue resolved by pull request 5583 [https://github.com/apache/kafka/pull/5583] > Connect logs 'this' for anonymous inner classes > --- > > Key: KAFKA-7353 > URL: https://issues.apache.org/jira/browse/KAFKA-7353 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.2, 1.1.1, 2.0.0 >Reporter: Kevin Lafferty >Priority: Minor > Fix For: 1.1.2, 1.0.3, 2.0.1, 2.1.0 > > > Some classes in the Kafka Connect runtime create anonymous inner classes that > log 'this', resulting in log messages that can't be correlated with any other > messages. These should scope 'this' to the outer class to have consistent log > messages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595907#comment-16595907 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- I think it is worth pointing out that this is one *proposed solution* to a (set of) challenge(s), but isn't the only possible solution. I'm seeing this referenced elsewhere as if it is the only fix, but some might have already been addressed by [https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging], and even if they have not, we should address *current* gaps rather than work off of 3 year old issues. Despite this particular improvement not having been closed in that time, there was separate progress which changes the nature of the problem. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Ben Kirwin >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7242) Externalized secrets are revealed in task configuration
[ https://issues.apache.org/jira/browse/KAFKA-7242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7242. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 Issue resolved by pull request 5475 [https://github.com/apache/kafka/pull/5475] > Externalized secrets are revealed in task configuration > --- > > Key: KAFKA-7242 > URL: https://issues.apache.org/jira/browse/KAFKA-7242 > Project: Kafka > Issue Type: Bug >Reporter: Bahdan Siamionau >Assignee: Robert Yokota >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Trying to use new [externalized > secrets|https://issues.apache.org/jira/browse/KAFKA-6886] feature I noticed > that task configuration is being saved in config topic with disclosed > secrets. It seems like the main goal of feature was not achieved - secrets > are still persisted in plain-text. Probably I'm misusing this new config, > please correct me if I wrong. > I'm running connect in distributed mode, creating connector with following > config: > {code:java} > { > "name" : "jdbc-sink-test", > "config" : { > "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", > "tasks.max" : "1", > "config.providers" : "file", > "config.providers.file.class" : > "org.apache.kafka.common.config.provider.FileConfigProvider", > "config.providers.file.param.secrets" : "/opt/mysecrets", > "topics" : "test_topic", > "connection.url" : "${file:/opt/mysecrets:url}", > "connection.user" : "${file:/opt/mysecrets:user}", > "connection.password" : "${file:/opt/mysecrets:password}", > "insert.mode" : "upsert", > "pk.mode" : "record_value", > "pk.field" : "id" > } > } > {code} > Connector works fine, placeholders are substituted with correct values from > file, but then updated config is written into the topic again (see 3 > following records in config topic): > {code:java} > key: connector-jdbc-sink-test > value: > { > "properties": { > "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", > "tasks.max": "1", > "config.providers": "file", > "config.providers.file.class": > "org.apache.kafka.common.config.provider.FileConfigProvider", > "config.providers.file.param.secrets": "/opt/mysecrets", > "topics": "test_topic", > "connection.url": "${file:/opt/mysecrets:url}", > "connection.user": "${file:/opt/mysecrets:user}", > "connection.password": "${file:/opt/mysecrets:password}", > "insert.mode": "upsert", > "pk.mode": "record_value", > "pk.field": "id", > "name": "jdbc-sink-test" > } > } > key: task-jdbc-sink-test-0 > value: > { > "properties": { > "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", > "config.providers.file.param.secrets": "/opt/mysecrets", > "connection.password": "actualpassword", > "tasks.max": "1", > "topics": "test_topic", > "config.providers": "file", > "pk.field": "id", > "task.class": "io.confluent.connect.jdbc.sink.JdbcSinkTask", > "connection.user": "datawarehouse", > "name": "jdbc-sink-test", > "config.providers.file.class": > "org.apache.kafka.common.config.provider.FileConfigProvider", > "connection.url": > "jdbc:postgresql://actualurl:5432/datawarehouse?stringtype=unspecified", > "insert.mode": "upsert", > "pk.mode": "record_value" > } > } > key: commit-jdbc-sink-test > value: > { > "tasks":1 > } > {code} > Please advice have I misunderstood the goal of the given feature, have I > missed smth in configuration or is it actually a bug? Thank you -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation
[ https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7225. -- Resolution: Fixed > Kafka Connect ConfigProvider not invoked before validation > -- > > Key: KAFKA-7225 > URL: https://issues.apache.org/jira/browse/KAFKA-7225 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Nacho Munoz >Assignee: Robert Yokota >Priority: Minor > Fix For: 2.0.1, 2.1.0 > > > When trying to register a JDBC connector with externalised secrets (e.g. > connection.password) the validation fails and the endpoint returns a 500. I > think that the problem is that the config transformer is not being invoked > before the validation so trying to exercise the credentials against the > database fails. I have checked that publishing the connector configuration > directly to the connect-config topic to skip the validation and restarting > the server is enough to get the connector working so that confirms that we > are just missing to call config transformer before validating the connector. > Please let me know if you need further information. > I'm happy to open a PR to address this issue given that I think that this is > easy enough to fix for a new contributor to the project. So please feel free > to assign the resolution of the bug to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation
[ https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-7225: - Fix Version/s: 2.1.0 2.0.1 > Kafka Connect ConfigProvider not invoked before validation > -- > > Key: KAFKA-7225 > URL: https://issues.apache.org/jira/browse/KAFKA-7225 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Nacho Munoz >Assignee: Robert Yokota >Priority: Minor > Fix For: 2.0.1, 2.1.0 > > > When trying to register a JDBC connector with externalised secrets (e.g. > connection.password) the validation fails and the endpoint returns a 500. I > think that the problem is that the config transformer is not being invoked > before the validation so trying to exercise the credentials against the > database fails. I have checked that publishing the connector configuration > directly to the connect-config topic to skip the validation and restarting > the server is enough to get the connector working so that confirms that we > are just missing to call config transformer before validating the connector. > Please let me know if you need further information. > I'm happy to open a PR to address this issue given that I think that this is > easy enough to fix for a new contributor to the project. So please feel free > to assign the resolution of the bug to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5071) ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: org.apache.kafka.streams.errors.ProcessorStateExcepti
[ https://issues.apache.org/jira/browse/KAFKA-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-5071: - Summary: ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed to flush state store fmdbt (was: 2017-04-11 18:18:45.574 ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed to flush state store fmdbt ) > ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] > Failed to commit StreamTask 0_304 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed > to flush state store fmdbt > - > > Key: KAFKA-5071 > URL: https://issues.apache.org/jira/browse/KAFKA-5071 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 > Environment: Linux >Reporter: Dhana >Priority: Major > Attachments: RocksDB_Issue_commitFailedonFlush.7z > > > Scenario: we use two consumer(applicaion -puse10) in different machine. > using 400 partitions, 200 streams/consumer. > config: > bootstrap.servers=10.16.34.29:9092,10.16.35.134:9092,10.16.38.27:9092 > zookeeper.connect=10.16.34.29:2181,10.16.35.134:2181,10.16.38.27:2181 > num.stream.threads=200 > pulse.per.pdid.count.enable=false > replication.factor=2 > state.dir=/opt/rocksdb > max.poll.records=50 > session.timeout.ms=18 > request.timeout.ms=502 > max.poll.interval.ms=500 > fetch.max.bytes=102400 > max.partition.fetch.bytes=102400 > heartbeat.interval.ms = 6 > Logs - attached. > Error: > 2017-04-11 18:18:45.170 INFO VehicleEventsStreamProcessor:219 > StreamThread-32 - Current size of Treemap is 4 for pdid > skga11041730gedvcl2pdid2236 > 2017-04-11 18:18:45.170 INFO VehicleEventsStreamProcessor:245 > StreamThread-32 - GE to be processed pdid skga11041730gedvcl2pdid2236 and > uploadTimeStamp 2017-04-11 17:46:06.883 > 2017-04-11 18:18:45.175 INFO VehicleEventsStreamProcessor:179 > StreamThread-47 - Arrived GE uploadTimestamp 2017-04-11 17:46:10.911 pdid > skga11041730gedvcl2pdid2290 > 2017-04-11 18:18:45.176 INFO VehicleEventsStreamProcessor:219 > StreamThread-47 - Current size of Treemap is 4 for pdid > skga11041730gedvcl2pdid2290 > 2017-04-11 18:18:45.176 INFO VehicleEventsStreamProcessor:245 > StreamThread-47 - GE to be processed pdid skga11041730gedvcl2pdid2290 and > uploadTimeStamp 2017-04-11 17:46:06.911 > 2017-04-11 18:18:45.571 INFO StreamThread:737 StreamThread-128 - > stream-thread [StreamThread-128] Committing all tasks because the commit > interval 3ms has elapsed > 2017-04-11 18:18:45.571 INFO StreamThread:775 StreamThread-128 - > stream-thread [StreamThread-128] Committing task StreamTask 0_304 > 2017-04-11 18:18:45.574 ERROR StreamThread:783 StreamThread-128 - > stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed > to flush state store fmdbt > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > while executing flush from store fmdbt > at > com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:353) > at > com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flush(HarmanRocksDBStore.java:342) > at > com.harman.analytics.stream.base.stores.HarmanPersistentKVStore.flush(HarmanPersistentKVStore.java:72) >
[jira] [Resolved] (KAFKA-7228) DeadLetterQueue throws a NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7228. -- Resolution: Fixed > DeadLetterQueue throws a NullPointerException > - > > Key: KAFKA-7228 > URL: https://issues.apache.org/jira/browse/KAFKA-7228 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Arjun Satish >Assignee: Arjun Satish >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Using the dead letter queue results in a NPE: > {code:java} > [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > DLQ reporter does not get a {{errorHandlingMetrics}} object when initialized > through the WorkerSinkTask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517504#comment-16517504 ] Ewen Cheslack-Postava commented on KAFKA-6889: -- Globs might be nice, but it looks like the services are just using it incorrectly in the first place. You shouldn't put all log files for all subclasses into the base class. Each Service class should make sure it includes the logs that it actually generates. From a quick glance at the base class, the JMX logs and the streams stderr/stdout stuff is all it guarantees will be there. Subclasses should override the value of `logs` for their own class. Also, while less ideal than a glob, you can always construct the `logs` list dynamically, e.g. to handle a dynamic number of log files on the same node. One of the reasons to avoid globs is that people tend to do excessively expensive things like just collecting everything by default, which becomes exceedingly expensive for passing tests. > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7068) ConfigTransformer doesn't handle null values
[ https://issues.apache.org/jira/browse/KAFKA-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7068. -- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 5241 [https://github.com/apache/kafka/pull/5241] > ConfigTransformer doesn't handle null values > > > Key: KAFKA-7068 > URL: https://issues.apache.org/jira/browse/KAFKA-7068 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.0, 2.1.0 > > > ConfigTransformer fails with NPE when the input configs have keys with null > values. This is a blocker for 2.0.0 since connectors configs can have null > values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-7058: Resolution: Fixed Assignee: Ewen Cheslack-Postava Reviewer: Ewen Cheslack-Postava Fix Version/s: 2.1.0 1.1.1 1.0.2 0.11.0.3 2.0.0 0.10.2.2 0.10.1.2 0.10.0.2 0.9.0.2 > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.1.0 >Reporter: Gunnar Morling >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.2, 0.10.0.2, 0.10.1.2, 0.10.2.2, 2.0.0, 0.11.0.3, > 1.0.2, 1.1.1, 2.1.0 > > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7047) Connect isolation whitelist does not include SimpleHeaderConverter
[ https://issues.apache.org/jira/browse/KAFKA-7047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7047. -- Resolution: Fixed Reviewer: Ewen Cheslack-Postava Fix Version/s: 2.1.0 1.1.1 2.0.0 > Connect isolation whitelist does not include SimpleHeaderConverter > -- > > Key: KAFKA-7047 > URL: https://issues.apache.org/jira/browse/KAFKA-7047 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 2.0.0, 1.1.1, 2.1.0 > > > The SimpleHeaderConverter added in 1.1.0 was never added to the PluginUtils > whitelist so that this header converter is loaded in isolation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7039. -- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 5191 [https://github.com/apache/kafka/pull/5191] > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.0, 2.1.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7056) Connect's new numeric converters should be in a different package
[ https://issues.apache.org/jira/browse/KAFKA-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7056. -- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 5222 [https://github.com/apache/kafka/pull/5222] > Connect's new numeric converters should be in a different package > - > > Key: KAFKA-7056 > URL: https://issues.apache.org/jira/browse/KAFKA-7056 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 2.0.0, 2.1.0 > > > KIP-305 added several new primitive converters, but placed them alongside > {{StringConverter}} in the {{...connect.storage}} package rather than > alongside {{ByteArrayConverter}} in the {{...connect.converters}} package. We > should move them to the {{converters}} package. See > https://github.com/apache/kafka/pull/5198 for a discussion. > Need to also update the plugins whitelist (see KAFKA-7043). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7009) Mute logger for reflections.org at the warn level in system tests
[ https://issues.apache.org/jira/browse/KAFKA-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7009. -- Resolution: Fixed Fix Version/s: 2.1.0 0.11.0.3 0.10.2.2 0.10.1.2 0.10.0.2 Issue resolved by pull request 5151 [https://github.com/apache/kafka/pull/5151] > Mute logger for reflections.org at the warn level in system tests > - > > Key: KAFKA-7009 > URL: https://issues.apache.org/jira/browse/KAFKA-7009 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect, system tests >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 2.0.0, 0.11.0.3, 2.1.0 > > > AK's Log4J configuration file for Connect includes [these > lines|https://github.com/apache/kafka/blob/trunk/config/connect-log4j.properties#L25]: > {code} > log4j.logger.org.apache.zookeeper=ERROR > log4j.logger.org.I0Itec.zkclient=ERROR > log4j.logger.org.reflections=ERROR > {code} > The last one suppresses lots of Reflections warnings like the following that > are output during classpath scanning and are harmless: > {noformat} > [2018-06-06 13:52:39,448] WARN could not create Vfs.Dir from url. ignoring > the exception and continuing (org.reflections.Reflections) > org.reflections.ReflectionsException: could not create Vfs.Dir from url, no > matching UrlType was found > [file:/usr/bin/../share/java/confluent-support-metrics/*] > either use fromURL(final URL url, final List urlTypes) or use the > static setDefaultURLTypes(final List urlTypes) or > addDefaultURLTypes(UrlType urlType) with your specialized UrlType. > at org.reflections.vfs.Vfs.fromURL(Vfs.java:109) > at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) > at org.reflections.Reflections.scan(Reflections.java:240) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.scan(DelegatingClassLoader.java:373) > at org.reflections.Reflections$1.run(Reflections.java:198) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The last line also need to be added to [Connect's Log4J configuration file in > the AK system > tests|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/templates/connect_log4j.properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7031) Kafka Connect API module depends on Jersey
[ https://issues.apache.org/jira/browse/KAFKA-7031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7031. -- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 5190 [https://github.com/apache/kafka/pull/5190] > Kafka Connect API module depends on Jersey > -- > > Key: KAFKA-7031 > URL: https://issues.apache.org/jira/browse/KAFKA-7031 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Randall Hauch >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.0, 2.1.0 > > > The Kafka Connect API module for 2.0.0 brings in Jersey dependencies. When I > run {{mvn dependency:tree}} on a project that depends only on the snapshot > version of {{org.apache.kafka:kafka-connect-api}}, the following are shown: > {noformat} > [INFO] +- org.apache.kafka:connect-api:jar:2.0.0-SNAPSHOT:compile > [INFO] | +- org.slf4j:slf4j-api:jar:1.7.25:compile > [INFO] | \- > org.glassfish.jersey.containers:jersey-container-servlet:jar:2.27:compile > [INFO] | +- > org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.27:compile > [INFO] | | \- > org.glassfish.hk2.external:javax.inject:jar:2.5.0-b42:compile > [INFO] | +- org.glassfish.jersey.core:jersey-common:jar:2.27:compile > [INFO] | | +- javax.annotation:javax.annotation-api:jar:1.2:compile > [INFO] | | \- org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile > [INFO] | +- org.glassfish.jersey.core:jersey-server:jar:2.27:compile > [INFO] | | +- org.glassfish.jersey.core:jersey-client:jar:2.27:compile > [INFO] | | +- > org.glassfish.jersey.media:jersey-media-jaxb:jar:2.27:compile > [INFO] | | \- javax.validation:validation-api:jar:1.1.0.Final:compile > [INFO] | \- javax.ws.rs:javax.ws.rs-api:jar:2.1:compile > ... > {noformat} > This may have been an unintended side effect of the > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin] > effort, which added the REST extension for Connect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510622#comment-16510622 ] Ewen Cheslack-Postava commented on KAFKA-6161: -- [~chia7712] Really up to [~evis] (we prefer to have existing assignee pass ownership on), but if we don't hear back, then sure, feel free to re-assign to yourself. > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Major > Labels: needs-kip > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510619#comment-16510619 ] Ewen Cheslack-Postava commented on KAFKA-6914: -- [~sriramks85] Spring Boot is handy for creating your own apps, but I'm not sure it's really the right solution for *repackaging* an existing app. Spring Boot combined with pluggable systems that require plugin discovery are likely to generally run into problems. Since it's a custom approach to Spring Boot, it's hard to generically support that deployment model. The current solution you have is definitely touching internal APIs that are not guaranteed to be stable, so it's not a great long term solution. The proposed fix is also relying on internal APIs that, again, are not guaranteed to be stable. I think a real solution here probably requires some further thought, more broadly than just Spring Boot, about how custom classloaders are used and how they interact with plugin-based systems like Connect. Are there any similar examples that you know of for plugin-based systems that face these same challenges? And relatedly, for your particular case, does just using an uber jar approach not work? Are you trying to support a variety of connectors with potentially conflicting dependencies, or are you packaging a single connector for deployment on OpenShift? [~flyaruu] This seems like it is probably reasonable – I don't think you're missing intention, but rather from the 99% use case we see the system classloader *is* the DelegatingClassLoader's classloader, so it's probably more of an oversight/bug. [~kkonstantine] thoughts? Seems like a pretty trivial fix (though difficult to include good tests for, as per usual with this classloader stuff). > Kafka Connect - Plugins class should have a constructor that can take in > parent ClassLoader > --- > > Key: KAFKA-6914 > URL: https://issues.apache.org/jira/browse/KAFKA-6914 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sriram KS >Priority: Minor > > Currently Plugins class has a single constructor that takes in map of props. > Please make Plugin class to have a constructor that takes in a classLoader as > well and use it to set DelegationClassLoader's parent classLoader. > Reason: > This will be useful if i am already having a managed class Loader environment > like a Spring boot app which resolves my class dependencies using my > maven/gradle dependency management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6914: - Fix Version/s: (was: 1.1.1) > Kafka Connect - Plugins class should have a constructor that can take in > parent ClassLoader > --- > > Key: KAFKA-6914 > URL: https://issues.apache.org/jira/browse/KAFKA-6914 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sriram KS >Priority: Minor > > Currently Plugins class has a single constructor that takes in map of props. > Please make Plugin class to have a constructor that takes in a classLoader as > well and use it to set DelegationClassLoader's parent classLoader. > Reason: > This will be useful if i am already having a managed class Loader environment > like a Spring boot app which resolves my class dependencies using my > maven/gradle dependency management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)
[ https://issues.apache.org/jira/browse/KAFKA-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7043. -- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 5198 [https://github.com/apache/kafka/pull/5198] > Connect isolation whitelist does not include new primitive converters > (KIP-305) > --- > > Key: KAFKA-7043 > URL: https://issues.apache.org/jira/browse/KAFKA-7043 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 2.0.0, 2.1.0 > > > KIP-305 added several new primitive converters, but the PR did not add them > to the whitelist for the plugin isolation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic
[ https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7003. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.0 Issue resolved by pull request 5159 [https://github.com/apache/kafka/pull/5159] > Add headers with error context in messages written to the Connect > DeadLetterQueue topic > --- > > Key: KAFKA-7003 > URL: https://issues.apache.org/jira/browse/KAFKA-7003 > Project: Kafka > Issue Type: Task >Reporter: Arjun Satish >Priority: Major > Fix For: 2.0.0, 2.1.0 > > > This was added to the KIP after the feature freeze. > If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set > to {{*true*}}, the following headers will be added to the produced raw > message (only if they don't already exist in the message). All values will be > serialized as UTF-8 strings. > ||Header Name||Description|| > |__connect.errors.topic|Name of the topic that contained the message.| > |__connect.errors.task.id|The numeric ID of the task that encountered the > error (encoded as a UTF-8 string).| > |__connect.errors.stage|The name of the stage where the error occurred.| > |__connect.errors.partition|The numeric ID of the partition in the original > topic that contained the message (encoded as a UTF-8 string).| > |__connect.errors.offset|The numeric value of the message offset in the > original topic (encoded as a UTF-8 string).| > |__connect.errors.exception.stacktrace|The stacktrace of the exception.| > |__connect.errors.exception.message|The message in the exception.| > |__connect.errors.exception.class.name|The fully qualified classname of the > exception that was thrown during the execution.| > |__connect.errors.connector.name|The name of the connector which encountered > the error.| > |__connect.errors.class.name|The fully qualified name of the class that > caused the error.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504252#comment-16504252 ] Ewen Cheslack-Postava commented on KAFKA-6161: -- lol, thanks for the offer [~mjsax], that was more of an offer that if someone wants to move this forward they could update relevant parts, sort out any KIP necessary, and file PR. I added `needs-kip` annotation, which I clearly missed in an earlier comment when I mentioned it needs a KIP to make compatibility clear, though we might be able to skip that as of JDK8 support. > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Major > Labels: needs-kip > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504243#comment-16504243 ] Ewen Cheslack-Postava commented on KAFKA-6161: -- I think it is now a compatible change to just provide default impls, but we should have some documentation that this is true. If it is the case, we can have a very minimal KIP process and just add the default impls, as long as the KIP discussion also agrees on the correct default implementations (the compatibility isn't the only issue – we also need to agree on a valid default implementation). Note that this is a bit different than the original request for an alternative set of interfaces that provide these by default. Given we can use default impls now, we'd probably also want to revise the description of this JIRA. > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Major > Labels: needs-kip > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6161: - Labels: needs-kip (was: ) > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Major > Labels: needs-kip > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3992) InstanceAlreadyExistsException Error for Consumers Starting in Parallel
[ https://issues.apache.org/jira/browse/KAFKA-3992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-3992: - Fix Version/s: (was: 2.1.0) > InstanceAlreadyExistsException Error for Consumers Starting in Parallel > --- > > Key: KAFKA-3992 > URL: https://issues.apache.org/jira/browse/KAFKA-3992 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.0.0 >Reporter: Alexander Cook >Assignee: Ewen Cheslack-Postava >Priority: Major > > I see the following error sometimes when I start multiple consumers at about > the same time in the same process (separate threads). Everything seems to > work fine afterwards, so should this not actually be an ERROR level message, > or could there be something going wrong that I don't see? > Let me know if I can provide any more info! > Error processing messages: Error registering mbean > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > org.apache.kafka.common.KafkaException: Error registering mbean > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > > Caused by: javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > Here is the full stack trace: > M[?:com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples:-1] - > Error processing messages: Error registering mbean > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > org.apache.kafka.common.KafkaException: Error registering mbean > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) > at > org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:641) > at org.apache.kafka.common.network.Selector.poll(Selector.java:268) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) > at > com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples(KafkaConsumerV9.java:129) > at > com.ibm.streamsx.messaging.kafka.KafkaConsumerV9$1.run(KafkaConsumerV9.java:70) > at java.lang.Thread.run(Thread.java:785) > at > com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137) > Caused by: javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:449) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1910) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:978) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:912) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:336) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:534) > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) > ... 18 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6997) Kafka run class doesn't exclude test-sources jar
[ https://issues.apache.org/jira/browse/KAFKA-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6997. -- Resolution: Fixed Issue resolved by pull request 5139 [https://github.com/apache/kafka/pull/5139] > Kafka run class doesn't exclude test-sources jar > > > Key: KAFKA-6997 > URL: https://issues.apache.org/jira/browse/KAFKA-6997 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Minor > Fix For: 2.0.0 > > > kafka-run-class.sh has a flag INCLUDE_TEST_JAR. This doesn't exclude > test-sources jar files when the flag is set to false. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6981) Missing Connector Config (errors.deadletterqueue.topic.name) kills Connect Clusters
[ https://issues.apache.org/jira/browse/KAFKA-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6981. -- Resolution: Fixed Issue resolved by pull request 5125 [https://github.com/apache/kafka/pull/5125] > Missing Connector Config (errors.deadletterqueue.topic.name) kills Connect > Clusters > --- > > Key: KAFKA-6981 > URL: https://issues.apache.org/jira/browse/KAFKA-6981 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Arjun Satish >Assignee: Arjun Satish >Priority: Major > Fix For: 2.0.0 > > > The trunk version of AK currently tries to incorrectly read the property > (errors.deadletterqueue.topic.name) when starting a sink connector. This > fails no matter what the contents of the connector config are. The > ConnectorConfig does not define this property, and any calls to getString > will throw a ConfigException (since only known properties are retained by > AbstractConfig). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486603#comment-16486603 ] Ewen Cheslack-Postava commented on KAFKA-6914: -- [~sriramks85] Not sure I understand the use case here. Kafka Connect is it's own service and it then loads all the plugins. How would Spring Boot fit in here? > Kafka Connect - Plugins class should have a constructor that can take in > parent ClassLoader > --- > > Key: KAFKA-6914 > URL: https://issues.apache.org/jira/browse/KAFKA-6914 > Project: Kafka > Issue Type: Bug >Reporter: Sriram KS >Priority: Major > Fix For: 1.1.1 > > > Currently Plugins class has a single constructor that takes in map of props. > Please make Plugin class to have a constructor that takes in a classLoader as > well and use it to set DelegationClassLoader's parent classLoader. > Reason: > This will be useful if i am already having a managed class Loader environment > like a Spring boot app which resolves my class dependencies using my > maven/gradle dependency management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5807) Check Connector.config() and Transformation.config() returns a valid ConfigDef
[ https://issues.apache.org/jira/browse/KAFKA-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5807. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request 3762 [https://github.com/apache/kafka/pull/3762] > Check Connector.config() and Transformation.config() returns a valid ConfigDef > -- > > Key: KAFKA-5807 > URL: https://issues.apache.org/jira/browse/KAFKA-5807 > Project: Kafka > Issue Type: Bug >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > Fix For: 2.0.0 > > > NPE is thrown when a developer returns a null when overloading > Connector.validate(). > {code} > [2017-08-23 13:36:30,086] ERROR Stopping after connector error > (org.apache.kafka.connect.cli.ConnectStandalone:99) > java.lang.NullPointerException > at > org.apache.kafka.connect.connector.Connector.validate(Connector.java:134) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254) > at > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158) > at > org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480973#comment-16480973 ] Ewen Cheslack-Postava commented on KAFKA-6566: -- [~rayokota] Merged back through 0.10.2, if we want it any earlier we'll need to file follow up PRs as it does not backport cleanly to those branches. > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6566: - Fix Version/s: 0.10.2.2 > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6566. -- Resolution: Fixed > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6566: - Reviewer: Ewen Cheslack-Postava > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()
[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6566: - Fix Version/s: 1.1.1 1.0.2 0.11.0.3 > SourceTask#stop() not called after exception raised in poll() > - > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Assignee: Robert Yokota >Priority: Blocker > Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public Class taskClass() { > return TestTask.class; > } > @Override > public List> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map props) { > } > @Override > public List poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469767#comment-16469767 ] Ewen Cheslack-Postava commented on KAFKA-6891: -- [~olkuznsmith] Yeah, agreed. The validation in the Connector config hasn't changed since Connect was released in 0.9 but the change for the clients happened in an 0.10 release, explaining the discrepancy. Would be nice if some of these shared configs had a nicer way of sharing all the settings (default values, validation, etc) in addition to just the doc name and string :( > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5141) WorkerTest.testCleanupTasksOnStop transient failure due to NPE
[ https://issues.apache.org/jira/browse/KAFKA-5141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5141. -- Resolution: Fixed Assignee: Ewen Cheslack-Postava Not sure of the fix, but we haven't seen this test failure in a long time. Pretty sure it has been fixed somewhere along the way. We can re-open if we see the same issue again. > WorkerTest.testCleanupTasksOnStop transient failure due to NPE > -- > > Key: KAFKA-5141 > URL: https://issues.apache.org/jira/browse/KAFKA-5141 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Labels: transient-unit-test-failure > > https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3281/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testCleanupTasksOnStop/ > Looks like the potential culprit is a NullPointerException when trying to > start a connector. It's likely being caught and logged via a catch > (Throwable). From the lines being executed it looks like the null might be > due to the instantiation of the Connector returning null, although I don't > see how that is possible given the current code. We may need more logging > output to track the issue down. > {quote} > Error Message > java.lang.AssertionError: > Expectation failure on verify: > WorkerSourceTask.run(): expected: 1, actual: 0 > Stacktrace > java.lang.AssertionError: > Expectation failure on verify: > WorkerSourceTask.run(): expected: 1, actual: 0 > at org.easymock.internal.MocksControl.verify(MocksControl.java:225) > at > org.powermock.api.easymock.internal.invocationcontrol.EasyMockMethodInvocationControl.verify(EasyMockMethodInvocationControl.java:132) > at org.powermock.api.easymock.PowerMock.verify(PowerMock.java:1466) > at org.powermock.api.easymock.PowerMock.verifyAll(PowerMock.java:1405) > at > org.apache.kafka.connect.runtime.WorkerTest.testCleanupTasksOnStop(WorkerTest.java:480) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:310) > at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89) > at > org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282) > at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87) > at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120) > at > org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34) > at > org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44) > at > org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122) > at > org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106) > at > org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53) > at > org.powermock.modules.juni
[jira] [Commented] (KAFKA-4882) Remove internal converter configuration from example property files
[ https://issues.apache.org/jira/browse/KAFKA-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458284#comment-16458284 ] Ewen Cheslack-Postava commented on KAFKA-4882: -- KAFKA-5540 is a generalization of this and looks to have a clean patch that can land soon. > Remove internal converter configuration from example property files > --- > > Key: KAFKA-4882 > URL: https://issues.apache.org/jira/browse/KAFKA-4882 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Mitch Seymour >Priority: Major > Labels: newbie > > Our internal converter configuration is shown in > connect-distributed.properties and connect-standalone.properties. > This tempts users to change it. > In particular, they seem to believe it needs to be identical to key/value > converters. > In reality, Connect doesn't deal well with anything other than schemaless > JSON as the internal converter. Users get errors and find it hard to figure > out what went wrong (since this is internal, they are not expected to). > Let's stop tempting users into shooting their own feet? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5635) KIP-181 Kafka-Connect integrate with kafka ReST Proxy
[ https://issues.apache.org/jira/browse/KAFKA-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458273#comment-16458273 ] Ewen Cheslack-Postava commented on KAFKA-5635: -- [~dhananjaydp] I'm inclined to close this as WONTFIX as long as REST proxies are third party – there's not really a clean way to support this generically since even with Confluent's REST proxy there are differences between v1 and v2 APIs (due to old/new Java clients) that are difficult to generalize. I think the intended goal here is great (i.e., provide compatibility), but ultimately it seems to mostly expose a limitation of the service you're working with. A REST API is a good compatibility layer to provide, but I would think most realistic cloud Kafka services that care about things like performance and scalability that are core to Kafka's value prop would support a native interface as well. Without it, many services layered on top of those protocols wouldn't work – Connect and Streams from Apache Kafka itself, as well as numerous other services such as KSQL, Spark Streaming, Flink, etc. I'm not convinced that adding REST proxy support to *all* of these in order to make them compatible with cloud services that don't expose Kafka's basic protocol layer would be in the interest of the project. > KIP-181 Kafka-Connect integrate with kafka ReST Proxy > - > > Key: KAFKA-5635 > URL: https://issues.apache.org/jira/browse/KAFKA-5635 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Dhananjay Patkar >Priority: Major > Labels: features, newbie > > Kafka connect currently uses kafka clients which directly connect to kafka > brokers. > In a use case, wherein I have many kafka connect [producers] running remotely > its a challenge to configure broker information on every connect agent. > Also, in case of IP change [upgrade or cluster re-creation], we need to > update every remote connect configuration. > If kafka connect source connectors talk to ReST endpoint then client is > unaware of broker details. This way we can transparently upgrade / re-create > kafka cluster as long as ReST endpoint remains same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458262#comment-16458262 ] Ewen Cheslack-Postava commented on KAFKA-6684: -- The idea here seems quite similar to the SimpleHeaderConverter included with KIP-145. We might want to consider a) standardizing all these casts/transformations across the different mechanisms (Cast transformation and SimpleHeaderConverter) and b) reusing as much code as possible. For (b), we might be able to just check whether something is a logical type and then always delegate to shared code from SimpleHeaderConverter so we get a minimal implementation w/ all the support for different types. > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Critical > Fix For: 2.0.0 > > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test
[ https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6735: Assignee: Ewen Cheslack-Postava > Document how to skip findbugs / checkstyle when running unit test > - > > Key: KAFKA-6735 > URL: https://issues.apache.org/jira/browse/KAFKA-6735 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Ewen Cheslack-Postava >Priority: Minor > > Even when running single unit test, findbugs dependency would result in some > time spent before the test is actually run. > We should document how findbugs dependency can be skipped in such scenario: > {code} > -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test
[ https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431672#comment-16431672 ] Ewen Cheslack-Postava commented on KAFKA-6735: -- Why should we document outs for these? They should never result in merging PRs more quickly, as seems to be the goal here, since any failure mentioned here should result in CI failures that a committer would never merge since the CI builds failed. > Document how to skip findbugs / checkstyle when running unit test > - > > Key: KAFKA-6735 > URL: https://issues.apache.org/jira/browse/KAFKA-6735 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > Even when running single unit test, findbugs dependency would result in some > time spent before the test is actually run. > We should document how findbugs dependency can be skipped in such scenario: > {code} > -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6728: Assignee: Randall Hauch > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Assignee: Randall Hauch >Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6728. -- Resolution: Fixed Fix Version/s: 1.1.1 1.2.0 Issue resolved by pull request 4815 [https://github.com/apache/kafka/pull/4815] > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6725) Indicate "isClosing" in the SinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-6725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6725: Assignee: Matt Farmer > Indicate "isClosing" in the SinkTaskContext > --- > > Key: KAFKA-6725 > URL: https://issues.apache.org/jira/browse/KAFKA-6725 > Project: Kafka > Issue Type: New Feature >Reporter: Matt Farmer >Assignee: Matt Farmer >Priority: Major > Labels: connect, connect-api > > Addition of the isClosing method to SinkTaskContext per this KIP. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75977607 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances
[ https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418435#comment-16418435 ] Ewen Cheslack-Postava commented on KAFKA-6717: -- Are both assigned the partition in the same generation? As consumers join the group, it will rebalance and change the assignment. If you are just looking at which consumers are assigned which partitions, it could appear that two of them are assigned the same partitions at the same time. I see you are using the NoOpConsumerRebalanceListener. With this, you wouldn't see when partitions were assigned or revoked. What are you doing to verify that both consumer instances are assigned the same partitions at the same time? Without correct handling of partition assignments and revocation, you definitely could see data processed twice. In fact, without additional steps taken to ensure no duplicates, *at least once* handling is what Kafka consumers would normally provide as long as they handle offset commits properly. > TopicPartition Assined twice to a consumer group for 2 consumer instances > -- > > Key: KAFKA-6717 > URL: https://issues.apache.org/jira/browse/KAFKA-6717 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Yuancheng PENG >Priority: Major > > I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain > pattern. > There are 10 consumers with the same group id. > I expected that topic-partition to be assigned to only one consumer instance. > However some topic partitions are assigned twice in 2 different difference > instance, hence the consumer group process duplicate messages. > {code:java} > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > Collections.singletonList(StickyAssignor.class)); > KafkaConsumer c = new KafkaConsumer<>(props); > c.subscribe(Pattern.compile(TOPIC_PATTERN), new > NoOpConsumerRebalanceListener()); > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407352#comment-16407352 ] Ewen Cheslack-Postava commented on KAFKA-3821: -- So a couple of points -- * That new SourceRecordReceiver interface doesn't necessarily avoid allocations, it likely just moves them around. The framework would still want to track the collection generated from a single poll() call because in the case of a connector throwing an error mid-poll(), we'd want to not actually write any of the messages generated thus far. So we basically just move the list into SourceRecordReceiver. In fact, doing so could make things worse because whereas a Connector may be able to figure out how large the list needs to be and do a single allocation, the framework has no idea and would probably do multiple rounds of expanding the list. * In any case, I really think that one allocation isn't worth worrying about unless someone has profiling to show it. We allocate so many objects just for a single SourceRecord, *especially* in pretty common cases of complex object structure, that this doesn't seem worth optimizing. * I think we should focus on optimizing for the common case, which is that there will be multiple messages. When that's not the case, the performance impact seems unimportant since it would probably mean you have relatively infrequent events. * Adding more interfaces adds to cognitive load and makes it harder to learn how to write connectors. Context objects already provide a place to request the framework do things outside the common workflow, so it seems like a natural place to add this functionality if we decided to. Same deal for the EOS stuff, which could potentially just be transaction APIs in the Context. * Just stylistically, Kafka's public APIs tend to try to keep things simple and straightforward (for some definition of those words that I honestly am not sure I could give a clear explanation of). I don't want to discourage the discussion of how to solve this problem for Debezium's use case, but I do want to make sure we're taking into account broader goals for the framework when figuring out how to solve it (e.g. SourceRecordReceiver may work, but I would argue returning a list of records makes things easier since it's obvious from one line of code how to implement it). It might also help to explain better why the general idea here just rubs me the wrong way. Mostly it boils down to sort of breaking the abstraction. The way offsets are handled for source connectors was supposed to mirror Kafka's log abstraction such that an offset really is unique. I guess maybe a gap in understanding for me is why offsets *wouldn't* be unique during snapshots and how rewind works for these snapshots. I get that not all systems or use cases can map perfectly, but a) would another option be to change how those offsets are handled by Debezium, b) would another option be having Debezium read 1 record forward to determine before returning the record and/or constructing its offset whether this is the final record of the snapshot, and c) do we have other use cases the demonstrate an impedance mismatch that would be well answered by solutions being proposed here? > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6676) System tests do not handle ZK chroot properly with SCRAM
Ewen Cheslack-Postava created KAFKA-6676: Summary: System tests do not handle ZK chroot properly with SCRAM Key: KAFKA-6676 URL: https://issues.apache.org/jira/browse/KAFKA-6676 Project: Kafka Issue Type: Bug Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava This is related to the issue observed in KAFKA-6672. There, we are now automatically creating parent nodes if they do not exist. However, if using a chroot within ZK and that chroot does not yet exist, you get an error message about "Path length must be > 0" as it tries to create all the parent paths. It would probably be better to be able to detect this issue and account for it, but currently system test code will fail if you use SCRAM and a chroot because while Kafka will create the chroot when it starts up, there are some commands related to security that may need to be executed before that and assume the chroot will already be there. We're currently missing this because while the chroot option is there, nothing in Kafka's tests are currently exercising it. So given what is apparently a common assumption in tools that the chroot already exists (since I think the core kafka server is the only thing that handles creating it if needed), I think the fix here would be two-fold: # Make KafkaService ensure the chroot exists before running any commands that might need it. # On at least one test that exercises security support, use a zk_chroot so that functionality is at least reasonably well exercised. It would be good to have this in both trunk and 1.1 branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()
[ https://issues.apache.org/jira/browse/KAFKA-6651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401270#comment-16401270 ] Ewen Cheslack-Postava commented on KAFKA-6651: -- Could go either way. Based on the comment, I think I originally exposed that constructor to support use cases that might do something like maintain a Map to dynamically generate schemas. I don't know that any actually do that in practice (though since you encountered this [~jcustenborder] maybe you are using a pattern like that?). I think most that need something dynamic end up just using a big switch statement with cases for each type. If nobody uses it, the other option would be to deprecate and remove that constructor. > SchemaBuilder should not allow Arrays or Maps to be created by type() > - > > Key: KAFKA-6651 > URL: https://issues.apache.org/jira/browse/KAFKA-6651 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Jeremy Custenborder >Priority: Minor > > The following code should throw an exception because we cannot set > valueSchema() or keySchema() once the builder is returned. > {code:java} > SchemaBuilder.type(Schema.Type.ARRAY); > SchemaBuilder.type(Schema.Type.MAP);{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401255#comment-16401255 ] Ewen Cheslack-Postava commented on KAFKA-6661: -- Oh, also, I marked it as 1.1.1 release, but if we end up doing another RC we'll want to adjust this to 1.1.0. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1 > > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401254#comment-16401254 ] Ewen Cheslack-Postava commented on KAFKA-6661: -- [~rhauch] Got this cherry-picked back to 0.10.0. If we want 0.9.0 as well, we probably need a separate PR since there was enough code movement to make it non-trivial. On a related note, it wasn't too bad in this case, but for things we want to backport its better to separate the must-have stuff from the nice-to-have improvements. It's not so much the risk (in this case it was just a toString and some log statements), but that the larger the patch, the less likely we get a clean cherry-pick and that's a lot more time consuming in cases where we want to cherry-pick through a bunch of release branches. In this case the pain was really caused by a different commit that mostly made cosmetic improvements that make cherry-picking encounter conflicts, but something to keep in mind in the future. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1 > > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task
[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6661: - Fix Version/s: 1.1.1 1.0.2 0.11.0.3 0.10.2.2 0.10.1.2 0.10.0.2 > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > -- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1 > > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393217#comment-16393217 ] Ewen Cheslack-Postava commented on KAFKA-6626: -- It cannot be a regular map, see the comment right above that field: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L79-L80 > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5939) Add a dryrun option to release.py
[ https://issues.apache.org/jira/browse/KAFKA-5939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390528#comment-16390528 ] Ewen Cheslack-Postava commented on KAFKA-5939: -- I think dry run is fine if we're clear about what it means. The scariest part of developing the script originally was the final steps around pushing tags. Otherwise most of it is only affecting stuff that's easy to clean up. To me, the most useful dry run would: * Still prompt about the steps that would upload, but just say what they would do * Say what tagging it would do, but skip it * Still do the full build and provide artifacts, which allows you to do some test run and validation without really generating an RC I'm not sure how the publication to the maven repo would work as a dry-run, maybe just publish to a local directory. The other thing which could possibly be handled here or could be a different Jira is to have a local copy of all artifacts when you've completed. This would apply to the regular release artifacts (it was a pain to pull them back down from my Kafka home directory in order to promote the release) and the maven artifacts (which is useful for doing validation, which should also be automated). > Add a dryrun option to release.py > - > > Key: KAFKA-5939 > URL: https://issues.apache.org/jira/browse/KAFKA-5939 > Project: Kafka > Issue Type: New Feature > Components: tools >Reporter: Damian Guy >Priority: Major > > It would be great to add a `dryrun` feature to `release.py` so that it can be > used to test changes to the scripts etc. At the moment you need to make sure > all JIRAs are closed for the release, have no uncommited changes etc, which > is a bit of a hassle when you just want to test a change you've made to the > script. There may be other things that need to be skipped, too -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389095#comment-16389095 ] Ewen Cheslack-Postava commented on KAFKA-6127: -- Isn't a basic `poll()` also an issue since it blocks on group membership? > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > We might consider to use {{wakeup()}} calls to unblock those operations to > keep {{StreamThread}} in a responsive state. > Note: there are discussion to add timeout to those calls, and thus, we could > get {{TimeoutExceptions}}. This would be easier to handle than using > {{wakeup()}}. Thus, we should keep an eye on those discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5999) Offset Fetch Request
[ https://issues.apache.org/jira/browse/KAFKA-5999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5999. -- Resolution: Invalid Assignee: Ewen Cheslack-Postava Closing as Invalid until we get some clarification about the issue. > Offset Fetch Request > > > Key: KAFKA-5999 > URL: https://issues.apache.org/jira/browse/KAFKA-5999 > Project: Kafka > Issue Type: Improvement >Reporter: Zhao Weilong >Assignee: Ewen Cheslack-Postava >Priority: Major > > New kafka (found in 10.2.1) support new feature for all topic which is put > number of topics -1. (v2) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5799) New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
[ https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389064#comment-16389064 ] Ewen Cheslack-Postava commented on KAFKA-5799: -- Is there an actual Kafka issue here? It looks to me like this is just an issue with the interfaces/types used in Storm. Kafka allows you to deserialize to whatever types you like and the KeyValueScheme issues seem to be limitations of Storm interfaces. Perhaps this is better filed against Storm? > New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme > > > Key: KAFKA-5799 > URL: https://issues.apache.org/jira/browse/KAFKA-5799 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.11.0.0 > Environment: apache-storm 1.1.0 >Reporter: Juhong NamGung >Priority: Minor > Attachments: 1.JPG, 2.JPG, bakvs.JPG > > > I try to integrate Kafka with Apache Strom. > I want to get data from Kafka, using KafkaSpout in Apache Storm. > To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. > (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka > gets transformed into a storm tuple) > I want to get both key and value in Kafka, so I used to KafkaSpoutConfig > ‘KeyValueSchemeAsMultiScheme’. > KeyValueSchemeAsMultiScheme’s Constructor is as follows. > [^2.JPG] > But, as you can see in the picture, implementing classes of Interface > KeyValueScheme are only StringKeyValueScheme. > [^1.JPG] > Using StringKeyValueShceme causes problems when importing Integer data from > Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String. > So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to > ByteArray. > ByteArrayKeyValueScheme imports data as BtyeArray. > If you use ByteArrayKeyValueScheme, you can import data regardless of data > type from Kafka without error. > (But, you should convert data type ByteArray to data type that you want(e.g. > String, Integer...)) > [^bakvs.JPG] > {code:java} > // Some comments here > import java.nio.ByteBuffer; > import java.util.List; > import org.apache.storm.kafka.KeyValueScheme; > import org.apache.storm.spout.RawScheme; > import org.apache.storm.tuple.Values; > import com.google.common.collect.ImmutableMap; > public class ByteArrayKeyValueScheme extends RawScheme implements > KeyValueScheme { > @Override > public List deserializeKeyAndValue(ByteBuffer key, ByteBuffer > value) { > // TODO Auto-generated method stub > if (key == null) { > return deserialize(value); > } > Object keytuple = deserialize(key).get(0); > Object valuetuple = deserialize(value).get(0); > return new Values(ImmutableMap.of(keytuple, valuetuple)); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5626) Producer should be able to negotiate ProduceRequest version with broker
[ https://issues.apache.org/jira/browse/KAFKA-5626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387319#comment-16387319 ] Ewen Cheslack-Postava commented on KAFKA-5626: -- [~lindong] Given KIP-97, is this relevant anymore? > Producer should be able to negotiate ProduceRequest version with broker > --- > > Key: KAFKA-5626 > URL: https://issues.apache.org/jira/browse/KAFKA-5626 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5471) Original Kafka paper link broken
[ https://issues.apache.org/jira/browse/KAFKA-5471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5471. -- Resolution: Fixed Assignee: Ewen Cheslack-Postava Updated the link to what appears to be a more permanent NetDB pdf link. > Original Kafka paper link broken > > > Key: KAFKA-5471 > URL: https://issues.apache.org/jira/browse/KAFKA-5471 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Jeremy Hanna >Assignee: Ewen Cheslack-Postava >Priority: Trivial > > Currently on the [Kafka papers and presentations > site|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations] > the original Kafka paper is linked but it's a broken link. > Currently it links to > [here|http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf] > but that person may have taken the paper down. I found it > [here|http://notes.stephenholiday.com/Kafka.pdf] but that could have a > similar problem in the future. We should be able to put the file as an > attachment in the confluence wiki to make it a more permanent link. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5306) Official init.d scripts
[ https://issues.apache.org/jira/browse/KAFKA-5306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387300#comment-16387300 ] Ewen Cheslack-Postava commented on KAFKA-5306: -- I think the challenge here is that these vary from distro to distro. Some use systemd, some are still on init.d; installation paths, config files, etc would vary between distros. And some distros prefer having their own builds from src using their own conventions vs using binaries from upstream. Given the myriad distributions, I'm not sure how this would be maintainable other than a super flexible, customizable script, and even then I'm not sure it could cover differences such as systemd vs init.d. Any ideas as to how this could scale out for the variety of distros? > Official init.d scripts > --- > > Key: KAFKA-5306 > URL: https://issues.apache.org/jira/browse/KAFKA-5306 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.1 > Environment: Ubuntu 14.04 >Reporter: Shahar >Priority: Minor > > It would be great to have an officially supported init.d script for starting > and stopping Kafka as a service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)