[jira] [Assigned] (KAFKA-3846) Connect record types should include timestamps
[ https://issues.apache.org/jira/browse/KAFKA-3846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reassigned KAFKA-3846: -- Assignee: Shikhar Bhushan (was: Ewen Cheslack-Postava) > Connect record types should include timestamps > -- > > Key: KAFKA-3846 > URL: https://issues.apache.org/jira/browse/KAFKA-3846 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.1.0 > > > Timestamps were added to records in the previous release, however this does > not get propagated automatically to Connect because it uses custom wrappers > to add fields and rename some for clarity. > The addition of timestamps should be trivial, but can be really useful (e.g. > in sink connectors that would like to include timestamp info if available but > when it is not stored in the value). > This is public API so it will need a KIP despite being very uncontentious. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3846) Connect record types should include timestamps
[ https://issues.apache.org/jira/browse/KAFKA-3846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346891#comment-15346891 ] Shikhar Bhushan commented on KAFKA-3846: https://cwiki.apache.org/confluence/display/KAFKA/KIP-65%3A+Expose+timestamps+to+Connect > Connect record types should include timestamps > -- > > Key: KAFKA-3846 > URL: https://issues.apache.org/jira/browse/KAFKA-3846 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.1.0 > > > Timestamps were added to records in the previous release, however this does > not get propagated automatically to Connect because it uses custom wrappers > to add fields and rename some for clarity. > The addition of timestamps should be trivial, but can be really useful (e.g. > in sink connectors that would like to include timestamp info if available but > when it is not stored in the value). > This is public API so it will need a KIP despite being very uncontentious. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3846) Connect record types should include timestamps
[ https://issues.apache.org/jira/browse/KAFKA-3846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3846 started by Shikhar Bhushan. -- > Connect record types should include timestamps > -- > > Key: KAFKA-3846 > URL: https://issues.apache.org/jira/browse/KAFKA-3846 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.1.0 > > > Timestamps were added to records in the previous release, however this does > not get propagated automatically to Connect because it uses custom wrappers > to add fields and rename some for clarity. > The addition of timestamps should be trivial, but can be really useful (e.g. > in sink connectors that would like to include timestamp info if available but > when it is not stored in the value). > This is public API so it will need a KIP despite being very uncontentious. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2935) Remove vestigial CLUSTER_CONFIG in WorkerConfig
[ https://issues.apache.org/jira/browse/KAFKA-2935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290294#comment-15290294 ] Shikhar Bhushan commented on KAFKA-2935: I could not find any documentation reference to this config, so hope the above patch is GTG. p.s. Can I please be added as a contributor for JIRA :) > Remove vestigial CLUSTER_CONFIG in WorkerConfig > > > Key: KAFKA-2935 > URL: https://issues.apache.org/jira/browse/KAFKA-2935 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > > This config isn't used anywhere anymore. Its previous reason for existence is > now handled by DistributedConfig.GROUP_ID_CONFIG. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3335) Kafka Connect hangs in shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290333#comment-15290333 ] Shikhar Bhushan commented on KAFKA-3335: Currently {{start()}} looks like {code:java} public void start() { try { log.info("Kafka Connect starting"); Runtime.getRuntime().addShutdownHook(shutdownHook); herder.start(); rest.start(herder); log.info("Kafka Connect started"); } finally { startLatch.countDown(); } } {code} Would it make sense to only add the shutdown hook at the end of this method? > Kafka Connect hangs in shutdown hook > > > Key: KAFKA-3335 > URL: https://issues.apache.org/jira/browse/KAFKA-3335 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ben Kirwin > > The `Connect` class can run into issues during start, such as: > {noformat} > Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: > Could not look up partition metadata for offset backing store topic in > allotted period. This could indicate a connectivity issue, unavailable topic > partitions, or if this is your first use of the topic it may have taken too > long to create. > at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130) > at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85) > at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108) > at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62) > {noformat} > This exception halts the startup process. It also triggers the shutdown > hook... which blocks waiting for the service to start up before calling stop. > This causes the process to hang forever. > There's a few things that could be done here, but it would be nice to bound > the amount of time the process spends trying to exit gracefully. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4010) ConfigDef.toRst() should create sections for each group
Shikhar Bhushan created KAFKA-4010: -- Summary: ConfigDef.toRst() should create sections for each group Key: KAFKA-4010 URL: https://issues.apache.org/jira/browse/KAFKA-4010 Project: Kafka Issue Type: Improvement Reporter: Shikhar Bhushan Priority: Minor Currently the ordering seems a bit arbitrary. There is a logical grouping that connectors are now able to specify with the 'group' field, which we should use as section headers. Also it would be good to generate {{:ref:}} for each section. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3962) ConfigDef support for resource-specific configuration
Shikhar Bhushan created KAFKA-3962: -- Summary: ConfigDef support for resource-specific configuration Key: KAFKA-3962 URL: https://issues.apache.org/jira/browse/KAFKA-3962 Project: Kafka Issue Type: Improvement Reporter: Shikhar Bhushan It often comes up with connectors that you want some piece of configuration that should be overridable at the topic-level, table-level, etc. There are a couple of possible ways to allow for this: 1. Support for map-style config properties "k1:v1,k2:v2". There are escaping considerations to think through here. Also, how should the user override fallback/default values -- perhaps {{*}} as a special resource? 2. Templatized configs -- so we can define {{$resource.some.property}} with the ConfigDef API, and have getter variants that take the resource argument. The default value is more naturally overridable here, by the user setting {{some.property}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3962) ConfigDef support for resource-specific configuration
[ https://issues.apache.org/jira/browse/KAFKA-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-3962: --- Description: It often comes up with connectors that you want some piece of configuration that should be overridable at the topic-level, table-level, etc. The ConfigDef API should allow for defining these resource-overridable config properties and we should have getter variants that accept a resource argument, and return the more specific config value (falling back to the default). There are a couple of possible ways to allow for this: 1. Support for map-style config properties "resource1:v1,resource2:v2". There are escaping considerations to think through here. Also, how should the user override fallback/default values -- perhaps {{*}} as a special resource? 2. Templatized configs -- so you would define {{$resource.some.property}}. The default value is more naturally overridable here, by the user setting {{some.property}} without the {{$resource}} prefix. was: It often comes up with connectors that you want some piece of configuration that should be overridable at the topic-level, table-level, etc. There are a couple of possible ways to allow for this: 1. Support for map-style config properties "k1:v1,k2:v2". There are escaping considerations to think through here. Also, how should the user override fallback/default values -- perhaps {{*}} as a special resource? 2. Templatized configs -- so we can define {{$resource.some.property}} with the ConfigDef API, and have getter variants that take the resource argument. The default value is more naturally overridable here, by the user setting {{some.property}}. > ConfigDef support for resource-specific configuration > - > > Key: KAFKA-3962 > URL: https://issues.apache.org/jira/browse/KAFKA-3962 > Project: Kafka > Issue Type: Improvement >Reporter: Shikhar Bhushan > > It often comes up with connectors that you want some piece of configuration > that should be overridable at the topic-level, table-level, etc. > The ConfigDef API should allow for defining these resource-overridable config > properties and we should have getter variants that accept a resource > argument, and return the more specific config value (falling back to the > default). > There are a couple of possible ways to allow for this: > 1. Support for map-style config properties "resource1:v1,resource2:v2". There > are escaping considerations to think through here. Also, how should the user > override fallback/default values -- perhaps {{*}} as a special resource? > 2. Templatized configs -- so you would define {{$resource.some.property}}. > The default value is more naturally overridable here, by the user setting > {{some.property}} without the {{$resource}} prefix. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4048) Connect does not support RetriableException consistently for sinks
[ https://issues.apache.org/jira/browse/KAFKA-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4048: --- Summary: Connect does not support RetriableException consistently for sinks (was: Connect does not support RetriableException consistently for sources & sinks) > Connect does not support RetriableException consistently for sinks > -- > > Key: KAFKA-4048 > URL: https://issues.apache.org/jira/browse/KAFKA-4048 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > We only allow for handling {{RetriableException}} from calls to > {{SinkTask.put()}}, but this is something we should support also for > {{flush()}} and arguably also {{open()}}. > We don't have support for {{RetriableException}} with {{SourceTask}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4048) Connect does not support RetriableException consistently for sinks
[ https://issues.apache.org/jira/browse/KAFKA-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4048: --- Description: We only allow for handling {{RetriableException}} from calls to {{SinkTask.put()}}, but this is something we should support also for {{flush()}} and arguably also {{open()}}. (was: We only allow for handling {{RetriableException}} from calls to {{SinkTask.put()}}, but this is something we should support also for {{flush()}} and arguably also {{open()}}. We don't have support for {{RetriableException}} with {{SourceTask}}.) > Connect does not support RetriableException consistently for sinks > -- > > Key: KAFKA-4048 > URL: https://issues.apache.org/jira/browse/KAFKA-4048 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > We only allow for handling {{RetriableException}} from calls to > {{SinkTask.put()}}, but this is something we should support also for > {{flush()}} and arguably also {{open()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4042) DistributedHerder thread can die because of connector & task lifecycle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4042: --- Component/s: KafkaConnect > DistributedHerder thread can die because of connector & task lifecycle > exceptions > - > > Key: KAFKA-4042 > URL: https://issues.apache.org/jira/browse/KAFKA-4042 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > As one example, there isn't exception handling in > {{DistributedHerder.startConnector()}} or the call-chain for it originating > in the {{tick()}} on the herder thread, and it can throw an exception because > of a bad class name in the connector config. (report of issue in wild: > https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4048) Connect does not support RetriableException consistently for sources & sinks
Shikhar Bhushan created KAFKA-4048: -- Summary: Connect does not support RetriableException consistently for sources & sinks Key: KAFKA-4048 URL: https://issues.apache.org/jira/browse/KAFKA-4048 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan We only allow for handling {{RetriableException}} from calls to {{SinkTask.put()}}, but this is something we should support also for {{flush()}} and arguably also {{open()}}. We don't have support for {{RetriableException}} with {{SourceTask}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4042) DistributedHerder thread can die because of connector & task lifecycle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4042: --- Summary: DistributedHerder thread can die because of connector & task lifecycle exceptions (was: Missing error handling in Worker.startConnector() can cause Herder thread to die) > DistributedHerder thread can die because of connector & task lifecycle > exceptions > - > > Key: KAFKA-4042 > URL: https://issues.apache.org/jira/browse/KAFKA-4042 > Project: Kafka > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > While errors in {{WorkerConnector}} are handled using the > {{statusListener.onFailure()}} callback, before the {{WorkerConnector}} is > created in {{Worker.startConnector()}} there can be exceptions because of > e.g. a bad class name and these are currently not handled, causing the > {{Herder}} thread to die. > Report of issue in wild: > https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4042) DistributedHerder thread can die because of connector & task lifecycle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4042: --- Description: As one example, there isn't exception handling in {{DistributedHerder.startConnector()}} or the call-chain for it originating in the {{tick()}} on the herder thread, and it can throw an exception because of a bad class name in the connector config. (report of issue in wild: https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ) (was: While errors in {{WorkerConnector}} are handled using the {{statusListener.onFailure()}} callback, before the {{WorkerConnector}} is created in {{Worker.startConnector()}} there can be exceptions because of e.g. a bad class name and these are currently not handled, causing the {{Herder}} thread to die. Report of issue in wild: https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ) > DistributedHerder thread can die because of connector & task lifecycle > exceptions > - > > Key: KAFKA-4042 > URL: https://issues.apache.org/jira/browse/KAFKA-4042 > Project: Kafka > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > As one example, there isn't exception handling in > {{DistributedHerder.startConnector()}} or the call-chain for it originating > in the {{tick()}} on the herder thread, and it can throw an exception because > of a bad class name in the connector config. (report of issue in wild: > https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4042) Missing error handling in Worker.startConnector() can cause Herder thread to die
Shikhar Bhushan created KAFKA-4042: -- Summary: Missing error handling in Worker.startConnector() can cause Herder thread to die Key: KAFKA-4042 URL: https://issues.apache.org/jira/browse/KAFKA-4042 Project: Kafka Issue Type: Bug Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan While errors in {{WorkerConnector}} are handled using the {{statusListener.onFailure()}} callback, before the {{WorkerConnector}} is created in {{Worker.startConnector()}} there can be exceptions because of e.g. a bad class name and these are currently not handled, causing the {{Herder}} thread to die. Report of issue in wild: https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3054) Connect Herder fail forever if sent a wrong connector config or task config
[ https://issues.apache.org/jira/browse/KAFKA-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reassigned KAFKA-3054: -- Assignee: Shikhar Bhushan (was: jin xing) > Connect Herder fail forever if sent a wrong connector config or task config > --- > > Key: KAFKA-3054 > URL: https://issues.apache.org/jira/browse/KAFKA-3054 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: jin xing >Assignee: Shikhar Bhushan >Priority: Blocker > Fix For: 0.10.1.0 > > > Connector Herder throws ConnectException and shutdown if sent a wrong config, > restarting herder will keep failing with the wrong config; It make sense that > herder should stay available when start connector or task failed; After > receiving a delete connector request, the herder can delete the wrong config > from "config storage" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4678) Create separate page for Connect docs
Shikhar Bhushan created KAFKA-4678: -- Summary: Create separate page for Connect docs Key: KAFKA-4678 URL: https://issues.apache.org/jira/browse/KAFKA-4678 Project: Kafka Issue Type: Improvement Components: documentation, KafkaConnect Reporter: Shikhar Bhushan Assignee: Ewen Cheslack-Postava Priority: Minor The single-page http://kafka.apache.org/documentation/ is quite long, and will get even longer with the inclusion of info on Kafka Connect's included transformations. Recently Kafka Streams documentation was split off to its own page with a short overview in the main doc page. We should do the same for {{connect.html}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3054) Connect Herder fail forever if sent a wrong connector config or task config
[ https://issues.apache.org/jira/browse/KAFKA-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-3054. Resolution: Done > Connect Herder fail forever if sent a wrong connector config or task config > --- > > Key: KAFKA-3054 > URL: https://issues.apache.org/jira/browse/KAFKA-3054 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: jin xing >Assignee: Shikhar Bhushan >Priority: Blocker > Fix For: 0.10.1.0 > > > Connector Herder throws ConnectException and shutdown if sent a wrong config, > restarting herder will keep failing with the wrong config; It make sense that > herder should stay available when start connector or task failed; After > receiving a delete connector request, the herder can delete the wrong config > from "config storage" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3054) Connect Herder fail forever if sent a wrong connector config or task config
[ https://issues.apache.org/jira/browse/KAFKA-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427362#comment-15427362 ] Shikhar Bhushan commented on KAFKA-3054: Addressing this in KAFKA-4042, which should take care of remaining robustness issues in the {{DistributedHerder}} from bad connector or task configs. > Connect Herder fail forever if sent a wrong connector config or task config > --- > > Key: KAFKA-3054 > URL: https://issues.apache.org/jira/browse/KAFKA-3054 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: jin xing >Assignee: Shikhar Bhushan >Priority: Blocker > Fix For: 0.10.1.0 > > > Connector Herder throws ConnectException and shutdown if sent a wrong config, > restarting herder will keep failing with the wrong config; It make sense that > herder should stay available when start connector or task failed; After > receiving a delete connector request, the herder can delete the wrong config > from "config storage" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4042) DistributedHerder thread can die because of connector & task lifecycle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4042: --- Fix Version/s: 0.10.1.0 > DistributedHerder thread can die because of connector & task lifecycle > exceptions > - > > Key: KAFKA-4042 > URL: https://issues.apache.org/jira/browse/KAFKA-4042 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > Fix For: 0.10.1.0 > > > As one example, there isn't exception handling in > {{DistributedHerder.startConnector()}} or the call-chain for it originating > in the {{tick()}} on the herder thread, and it can throw an exception because > of a bad class name in the connector config. (report of issue in wild: > https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4042) Missing error handling in Worker.startConnector() can cause Herder thread to die
[ https://issues.apache.org/jira/browse/KAFKA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4042 started by Shikhar Bhushan. -- > Missing error handling in Worker.startConnector() can cause Herder thread to > die > > > Key: KAFKA-4042 > URL: https://issues.apache.org/jira/browse/KAFKA-4042 > Project: Kafka > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > While errors in {{WorkerConnector}} are handled using the > {{statusListener.onFailure()}} callback, before the {{WorkerConnector}} is > created in {{Worker.startConnector()}} there can be exceptions because of > e.g. a bad class name and these are currently not handled, causing the > {{Herder}} thread to die. > Report of issue in wild: > https://groups.google.com/d/msg/confluent-platform/EnleFnXpZCU/3B_gRxsRAgAJ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3054) Connect Herder fail forever if sent a wrong connector config or task config
[ https://issues.apache.org/jira/browse/KAFKA-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3054 started by Shikhar Bhushan. -- > Connect Herder fail forever if sent a wrong connector config or task config > --- > > Key: KAFKA-3054 > URL: https://issues.apache.org/jira/browse/KAFKA-3054 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: jin xing >Assignee: Shikhar Bhushan >Priority: Blocker > Fix For: 0.10.1.0 > > > Connector Herder throws ConnectException and shutdown if sent a wrong config, > restarting herder will keep failing with the wrong config; It make sense that > herder should stay available when start connector or task failed; After > receiving a delete connector request, the herder can delete the wrong config > from "config storage" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4068) FileSinkTask - use JsonConverter to serialize
Shikhar Bhushan created KAFKA-4068: -- Summary: FileSinkTask - use JsonConverter to serialize Key: KAFKA-4068 URL: https://issues.apache.org/jira/browse/KAFKA-4068 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan Priority: Minor People new to Connect often try out hooking up e.g. a Kafka topic with Avro data to the file sink connector, only to find the file contain values like: {noformat} org.apache.kafka.connect.data.Struct@ca1bf85a org.apache.kafka.connect.data.Struct@c298db6a org.apache.kafka.connect.data.Struct@44108fbd {noformat} This is because currently the {{FileSinkConnector}} is meant as a toy example that expects the schema to be {{Schema.STRING_SCHEMA}}, though it just {{toString()}}'s the value without verifying that. A better experience would probably be if we used {{JsonConverter.fromConnectData()}} for serializing to the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4070) Implement a useful Struct.toString()
[ https://issues.apache.org/jira/browse/KAFKA-4070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4070: --- Description: Logging of {{Struct}}'s does not currently provide any useful output, and users also find it unhelpful e.g. when hooking up a Kafka topic with Avro data with the {{FileSinkConnector}} which simply {{toString()}}s the values to the file. (was: Logging of {{Struct}}s does not currently provide any useful output, and users also find it unhelpful e.g. when hooking up a Kafka topic with Avro data with the {{FileSinkConnector}} which simply {{toString()}}s the values to the file.) > Implement a useful Struct.toString() > > > Key: KAFKA-4070 > URL: https://issues.apache.org/jira/browse/KAFKA-4070 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Priority: Minor > > Logging of {{Struct}}'s does not currently provide any useful output, and > users also find it unhelpful e.g. when hooking up a Kafka topic with Avro > data with the {{FileSinkConnector}} which simply {{toString()}}s the values > to the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4070) Implement a useful Struct.toString()
[ https://issues.apache.org/jira/browse/KAFKA-4070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4070: --- Description: Logging of {{Struct}}'s does not currently provide any useful output, and users also find it unhelpful e.g. when hooking up a Kafka topic with Avro data with the {{FileSinkConnector}} which simply {{toString()}}'s the values to the file. (was: Logging of {{Struct}}'s does not currently provide any useful output, and users also find it unhelpful e.g. when hooking up a Kafka topic with Avro data with the {{FileSinkConnector}} which simply {{toString()}}s the values to the file.) > Implement a useful Struct.toString() > > > Key: KAFKA-4070 > URL: https://issues.apache.org/jira/browse/KAFKA-4070 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Priority: Minor > > Logging of {{Struct}}'s does not currently provide any useful output, and > users also find it unhelpful e.g. when hooking up a Kafka topic with Avro > data with the {{FileSinkConnector}} which simply {{toString()}}'s the values > to the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4070) Implement a useful Struct.toString()
Shikhar Bhushan created KAFKA-4070: -- Summary: Implement a useful Struct.toString() Key: KAFKA-4070 URL: https://issues.apache.org/jira/browse/KAFKA-4070 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Shikhar Bhushan Priority: Minor Logging of {{Struct}}s does not currently provide any useful output, and users also find it unhelpful e.g. when hooking up a Kafka topic with Avro data with the {{FileSinkConnector}} which simply {{toString()}}s the values to the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4068) FileSinkTask - use JsonConverter to serialize
[ https://issues.apache.org/jira/browse/KAFKA-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-4068. Resolution: Not A Problem I was thinking JSON since it would be easy to serialize to a human-readable format with that. But if we want to implement a more useful {{Struct.toString()}} in any case for debugging purposes, we should probably do that instead. Fair point about keeping the file sink connector as simple as possible. Closing this in favor of KAFKA-4070. > FileSinkTask - use JsonConverter to serialize > - > > Key: KAFKA-4068 > URL: https://issues.apache.org/jira/browse/KAFKA-4068 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan >Priority: Minor > > People new to Connect often try out hooking up e.g. a Kafka topic with Avro > data to the file sink connector, only to find the file contain values like: > {noformat} > org.apache.kafka.connect.data.Struct@ca1bf85a > org.apache.kafka.connect.data.Struct@c298db6a > org.apache.kafka.connect.data.Struct@44108fbd > {noformat} > This is because currently the {{FileSinkConnector}} is meant as a toy example > that expects the schema to be {{Schema.STRING_SCHEMA}}, though it just > {{toString()}}'s the value without verifying that. > A better experience would probably be if we used > {{JsonConverter.fromConnectData()}} for serializing to the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-4127) Possible data loss
[ https://issues.apache.org/jira/browse/KAFKA-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan closed KAFKA-4127. -- > Possible data loss > -- > > Key: KAFKA-4127 > URL: https://issues.apache.org/jira/browse/KAFKA-4127 > Project: Kafka > Issue Type: Bug > Environment: Normal three node Kafka cluster. All machines running > linux. >Reporter: Ramnatthan Alagappan > > I am running a three node Kakfa cluster. ZooKeeper runs in a standalone mode. > When I create a new message topic, I see the following sequence of system > calls: > mkdir("/appdir/my-topic1-0") > creat("/appdir/my-topic1-0/.log") > I have configured Kafka to write the messages persistently to the disk before > acknowledging the client. Specifically, I have set flush.interval_messages to > 1, min_insync_replicas to 3, and disabled dirty election. Now, I insert a > new message into the created topic. > I see that Kafka writes the message to the log file and flushes the data down > to disk by carefully fsync'ing the log file. I get an acknowledgment back > from the cluster after the message is safely persisted on all three replicas > and written to disk. > Unfortunately, Kafka can still lose data since it does not explicitly fsync > the directory to persist the directory entries of the topic directory and the > log file. If a crash happens after acknowledging the client, it is possible > for Kafka lose the directory entry for the topic directory or the log file. > Many systems carefully issue fsync to the parent directory when a new file or > directory is created. This is required for the file to be completely > persisted to the disk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4127) Possible data loss
[ https://issues.apache.org/jira/browse/KAFKA-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468012#comment-15468012 ] Shikhar Bhushan commented on KAFKA-4127: Dupe of KAFKA-3968 > Possible data loss > -- > > Key: KAFKA-4127 > URL: https://issues.apache.org/jira/browse/KAFKA-4127 > Project: Kafka > Issue Type: Bug > Environment: Normal three node Kafka cluster. All machines running > linux. >Reporter: Ramnatthan Alagappan > > I am running a three node Kakfa cluster. ZooKeeper runs in a standalone mode. > When I create a new message topic, I see the following sequence of system > calls: > mkdir("/appdir/my-topic1-0") > creat("/appdir/my-topic1-0/.log") > I have configured Kafka to write the messages persistently to the disk before > acknowledging the client. Specifically, I have set flush.interval_messages to > 1, min_insync_replicas to 3, and disabled dirty election. Now, I insert a > new message into the created topic. > I see that Kafka writes the message to the log file and flushes the data down > to disk by carefully fsync'ing the log file. I get an acknowledgment back > from the cluster after the message is safely persisted on all three replicas > and written to disk. > Unfortunately, Kafka can still lose data since it does not explicitly fsync > the directory to persist the directory entries of the topic directory and the > log file. If a crash happens after acknowledging the client, it is possible > for Kafka lose the directory entry for the topic directory or the log file. > Many systems carefully issue fsync to the parent directory when a new file or > directory is created. This is required for the file to be completely > persisted to the disk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4127) Possible data loss
[ https://issues.apache.org/jira/browse/KAFKA-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-4127. Resolution: Duplicate > Possible data loss > -- > > Key: KAFKA-4127 > URL: https://issues.apache.org/jira/browse/KAFKA-4127 > Project: Kafka > Issue Type: Bug > Environment: Normal three node Kafka cluster. All machines running > linux. >Reporter: Ramnatthan Alagappan > > I am running a three node Kakfa cluster. ZooKeeper runs in a standalone mode. > When I create a new message topic, I see the following sequence of system > calls: > mkdir("/appdir/my-topic1-0") > creat("/appdir/my-topic1-0/.log") > I have configured Kafka to write the messages persistently to the disk before > acknowledging the client. Specifically, I have set flush.interval_messages to > 1, min_insync_replicas to 3, and disabled dirty election. Now, I insert a > new message into the created topic. > I see that Kafka writes the message to the log file and flushes the data down > to disk by carefully fsync'ing the log file. I get an acknowledgment back > from the cluster after the message is safely persisted on all three replicas > and written to disk. > Unfortunately, Kafka can still lose data since it does not explicitly fsync > the directory to persist the directory entries of the topic directory and the > log file. If a crash happens after acknowledging the client, it is possible > for Kafka lose the directory entry for the topic directory or the log file. > Many systems carefully issue fsync to the parent directory when a new file or > directory is created. This is required for the file to be completely > persisted to the disk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3962) ConfigDef support for resource-specific configuration
[ https://issues.apache.org/jira/browse/KAFKA-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468352#comment-15468352 ] Shikhar Bhushan edited comment on KAFKA-3962 at 9/6/16 7:49 PM: This is also realizable today by using {{ConfigDef.originalsWithPrefix()}}, if the template format is {{some.property.$resource}} rather than {{$resource.some.property}} as suggested above. There isn't library-level validation support when doing this, though. was (Author: shikhar): This is also realizable today by using {{ConfigDef.originalsWithPrefix()}}, if the template format is {{some.property.$resource}} rather than {{$resource.some.property}} as suggested above. There isn't framework-level validation support when doing this, though. > ConfigDef support for resource-specific configuration > - > > Key: KAFKA-3962 > URL: https://issues.apache.org/jira/browse/KAFKA-3962 > Project: Kafka > Issue Type: Improvement >Reporter: Shikhar Bhushan > > It often comes up with connectors that you want some piece of configuration > that should be overridable at the topic-level, table-level, etc. > The ConfigDef API should allow for defining these resource-overridable config > properties and we should have getter variants that accept a resource > argument, and return the more specific config value (falling back to the > default). > There are a couple of possible ways to allow for this: > 1. Support for map-style config properties "resource1:v1,resource2:v2". There > are escaping considerations to think through here. Also, how should the user > override fallback/default values -- perhaps {{*}} as a special resource? > 2. Templatized configs -- so you would define {{$resource.some.property}}. > The default value is more naturally overridable here, by the user setting > {{some.property}} without the {{$resource}} prefix. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3962) ConfigDef support for resource-specific configuration
[ https://issues.apache.org/jira/browse/KAFKA-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468352#comment-15468352 ] Shikhar Bhushan commented on KAFKA-3962: This is also realizable today by using {{ConfigDef.originalsWithPrefix()}}, if the template format is {{some.property.$resource}} rather than {{$resource.some.property}} as suggested above. There isn't framework-level validation support when doing this, though. > ConfigDef support for resource-specific configuration > - > > Key: KAFKA-3962 > URL: https://issues.apache.org/jira/browse/KAFKA-3962 > Project: Kafka > Issue Type: Improvement >Reporter: Shikhar Bhushan > > It often comes up with connectors that you want some piece of configuration > that should be overridable at the topic-level, table-level, etc. > The ConfigDef API should allow for defining these resource-overridable config > properties and we should have getter variants that accept a resource > argument, and return the more specific config value (falling back to the > default). > There are a couple of possible ways to allow for this: > 1. Support for map-style config properties "resource1:v1,resource2:v2". There > are escaping considerations to think through here. Also, how should the user > override fallback/default values -- perhaps {{*}} as a special resource? > 2. Templatized configs -- so you would define {{$resource.some.property}}. > The default value is more naturally overridable here, by the user setting > {{some.property}} without the {{$resource}} prefix. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4048) Connect does not support RetriableException consistently for sinks
[ https://issues.apache.org/jira/browse/KAFKA-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-4048. Resolution: Not A Problem Turns out all exceptions from {{task.flush()}} are treated as retriable (see {{WorkerSinkTask.commitOffsets()}}), so there is nothing to do here. > Connect does not support RetriableException consistently for sinks > -- > > Key: KAFKA-4048 > URL: https://issues.apache.org/jira/browse/KAFKA-4048 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > We only allow for handling {{RetriableException}} from calls to > {{SinkTask.put()}}, but this is something we should support also for > {{flush()}} and arguably also {{open()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4115) Grow default heap settings for distributed Connect from 256M to 1G
Shikhar Bhushan created KAFKA-4115: -- Summary: Grow default heap settings for distributed Connect from 256M to 1G Key: KAFKA-4115 URL: https://issues.apache.org/jira/browse/KAFKA-4115 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan Currently, both {{connect-standalone.sh}} and {{connect-distributed.sh}} start the Connect JVM with the default heap settings from {{kafka-run-class.sh}} of {{-Xmx256M}}. At least for distributed connect, we should default to a much higher limit like 1G. While the 'correct' sizing is workload dependent, with a system where you can run arbitrary connector plugins which may perform buffering of data, we should provide for more headroom. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4100) Connect Struct schemas built using SchemaBuilder with no fields cause NPE in Struct constructor
Shikhar Bhushan created KAFKA-4100: -- Summary: Connect Struct schemas built using SchemaBuilder with no fields cause NPE in Struct constructor Key: KAFKA-4100 URL: https://issues.apache.org/jira/browse/KAFKA-4100 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.0.1 Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan Priority: Minor Fix For: 0.10.1.0 Avro records can legitimately have 0 fields (though arguable how useful that is). When using the Confluent Schema Registry's {{AvroConverter}} with such a schema, {noformat} java.lang.NullPointerException at org.apache.kafka.connect.data.Struct.(Struct.java:56) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:980) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} This is because it is using the {{SchemaBuilder}} to create the Struct schema, which provides a {{field(..)}} builder for each field. If there are no fields, the list stays as null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4100) Connect Struct schemas built using SchemaBuilder with no fields cause NPE in Struct constructor
[ https://issues.apache.org/jira/browse/KAFKA-4100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4100 started by Shikhar Bhushan. -- > Connect Struct schemas built using SchemaBuilder with no fields cause NPE in > Struct constructor > --- > > Key: KAFKA-4100 > URL: https://issues.apache.org/jira/browse/KAFKA-4100 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.1 >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan >Priority: Minor > Fix For: 0.10.1.0 > > > Avro records can legitimately have 0 fields (though arguable how useful that > is). > When using the Confluent Schema Registry's {{AvroConverter}} with such a > schema, > {noformat} > java.lang.NullPointerException > at org.apache.kafka.connect.data.Struct.(Struct.java:56) > at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:980) > at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) > at > io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This is because it is using the {{SchemaBuilder}} to create the Struct > schema, which provides a {{field(..)}} builder for each field. If there are > no fields, the list stays as null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level
Shikhar Bhushan created KAFKA-4159: -- Summary: Allow overriding producer & consumer properties at the connector level Key: KAFKA-4159 URL: https://issues.apache.org/jira/browse/KAFKA-4159 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Ewen Cheslack-Postava As an example use cases, overriding a sink connector's consumer's partition assignment strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4161) Allow connectors to request flush via the context
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15491296#comment-15491296 ] Shikhar Bhushan commented on KAFKA-4161: bq. Probably worth clarifying whether we're really talking about just flush here or offset commit as well. Flush really only exists in order to support offset commit (from the framework's perspective), but since you mention full buffers I think you might be getting at a slightly different use case for connectors. Sorry I wasn't clear, flushing data & offset commit are currently coupled as you pointed out. If we want to avoid unnecessary redelivery of records it is best to commit offsets with the 'most current' knowledge of them, which we currently have after calling {{flush()}}. bq. In general, I think it'd actually be even better to just get rid of the idea of having to flush as a common operation as it hurts throughput to have to flush entirely to commit offsets (we are flushing the pipeline, which is never good). Ideally we coudl do what the framework does with source connectors and just track which data has been successfully delivered and use that for the majority of offset commits. We'd still need it for cases like shutdown where we want to make sure all data has been sent, but since the framework controls delivery of data, maybe its even better just to wait for that data to be written. Good points, I agree it would be better to make it so {{flush()}} is not routine since it can hurt throughput. I think we can deprecate it altogether. As a proposal: {noformat} abstract class SinkTask { .. // New method public MapflushedOffsets() { throw new NotImplementedException(); } @Deprecated public void flush(Map offsets) { } .. } {noformat} Then periodic offset committing business would get at the {{flushedOffsets()}}, and if that is not implemented, call {{flush()}} as currently so it can commit the offset state as of the last {{put()}} call. I don't think {{flush()}} is needed even at shutdown. Tasks are already being advised via {{close()}} and can choose to flush any buffered data from there. We can do a final offset commit based on the {{flushedOffsets()}} after {{close()}} (though this does imply a quirk that even after a {{TopicPartition}} is closed we expect tasks to keep offset state around in the map returned by {{flushedOffsets()}}). Additionally, it would be good to have a {{context.requestCommit()}} in the spirit of {{context.requestFlush()}} as I was originally proposing. The motivation is that connectors can optimize for avoiding unnecessary redelivery when recovering from failures. Connectors can choose whatever policies are best like number-of-records or size-based batching/buffering for writing to the destination system as part of the normal flow of calls to {{put()}}, and request a commit when they have actually written data to the destination system. There need not be a strong guarantee about whether offset committing actually happens after such a request so we don't commit offsets too often and can choose to only do it after some minimum interval, e.g. in case a connector always requests commit after a put. bq. The main reason I think we even need the explicit flush() is that some connectors may have very long delays between flushes (e.g. any object stores like S3) such that they need to be told directly that they need to write all their data (or discard it). I don't believe it is currently possible for a connector to communicate that it wants to discard data rather than write it out when {{flush()}} is called (aside from I guess throwing an exception...). With the above proposal the decision of when and whether or not to write data would be completely upto connectors. bq. Was there a specific connector & scenario you were thinking about here? This came up in a thread on the user list ('Sink Connector feature request: SinkTask.putAndReport()') > Allow connectors to request flush via the context > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be >
[jira] [Created] (KAFKA-4173) SchemaProjector should successfully project when source schema field is missing and target schema field is optional
Shikhar Bhushan created KAFKA-4173: -- Summary: SchemaProjector should successfully project when source schema field is missing and target schema field is optional Key: KAFKA-4173 URL: https://issues.apache.org/jira/browse/KAFKA-4173 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan As reported in https://github.com/confluentinc/kafka-connect-hdfs/issues/115 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4161) Allow connectors to request flush via the context
Shikhar Bhushan created KAFKA-4161: -- Summary: Allow connectors to request flush via the context Key: KAFKA-4161 URL: https://issues.apache.org/jira/browse/KAFKA-4161 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Ewen Cheslack-Postava It is desirable to have, in addition to the time-based flush interval, volume or size-based commits. E.g. a sink connector which is buffering in terms of number of records may want to request a flush when the buffer is full, or when sufficient amount of data has been buffered in a file. Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would allow for connectors to have flexible policies around flushes. This would be in addition to the time interval based flushes that are controlled with {{offset.flush.interval.ms}}, for which the clock should be reset when any kind of flush happens. We should probably also support requesting flushes via the {{SourceTaskContext}} for consistency though a use-case doesn't come to mind off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup
[ https://issues.apache.org/jira/browse/KAFKA-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4154: --- Fix Version/s: (was: 0.10.0.2) > Kafka Connect fails to shutdown if it has not completed startup > --- > > Key: KAFKA-4154 > URL: https://issues.apache.org/jira/browse/KAFKA-4154 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > To reproduce: > 1. Start Kafka Connect in distributed mode without Kafka running > {{./bin/connect-distributed.sh config/connect-distributed.properties}} > 2. Ctrl+C fails to terminate the process > thread dump: > {noformat} > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode): > "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on > condition [0x73129000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bd7d91d8> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357) > at > org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71) > at > org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93) > "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 > nid=0x560f in Object.wait() [0x71a61000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0007bd63db38> (a > org.apache.kafka.connect.runtime.Connect$ShutdownHook) > at java.lang.Thread.join(Thread.java:1245) > - locked <0x0007bd63db38> (a > org.apache.kafka.connect.runtime.Connect$ShutdownHook) > at java.lang.Thread.join(Thread.java:1319) > at > java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) > at > java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(Shutdown.java:123) > at java.lang.Shutdown.sequence(Shutdown.java:167) > at java.lang.Shutdown.exit(Shutdown.java:212) > - locked <0x0007b0244600> (a java.lang.Class for > java.lang.Shutdown) > at java.lang.Terminator$1.handle(Terminator.java:52) > at sun.misc.Signal$1.run(Signal.java:212) > at java.lang.Thread.run(Thread.java:745) > "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 > tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) > at > sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) > at > sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2) > - locked <0x0007bd7788e8> (a > java.util.Collections$UnmodifiableSet) > - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:470) > at > org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) > at java.lang.Thread.run(Thread.java:745) > "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 > waiting on condition [0x72505000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) >
[jira] [Updated] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup
[ https://issues.apache.org/jira/browse/KAFKA-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4154: --- Fix Version/s: (was: 0.10.1.0) 0.10.0.2 > Kafka Connect fails to shutdown if it has not completed startup > --- > > Key: KAFKA-4154 > URL: https://issues.apache.org/jira/browse/KAFKA-4154 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > Fix For: 0.10.0.2 > > > To reproduce: > 1. Start Kafka Connect in distributed mode without Kafka running > {{./bin/connect-distributed.sh config/connect-distributed.properties}} > 2. Ctrl+C fails to terminate the process > thread dump: > {noformat} > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode): > "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on > condition [0x73129000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bd7d91d8> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357) > at > org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71) > at > org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93) > "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 > nid=0x560f in Object.wait() [0x71a61000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0007bd63db38> (a > org.apache.kafka.connect.runtime.Connect$ShutdownHook) > at java.lang.Thread.join(Thread.java:1245) > - locked <0x0007bd63db38> (a > org.apache.kafka.connect.runtime.Connect$ShutdownHook) > at java.lang.Thread.join(Thread.java:1319) > at > java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) > at > java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(Shutdown.java:123) > at java.lang.Shutdown.sequence(Shutdown.java:167) > at java.lang.Shutdown.exit(Shutdown.java:212) > - locked <0x0007b0244600> (a java.lang.Class for > java.lang.Shutdown) > at java.lang.Terminator$1.handle(Terminator.java:52) > at sun.misc.Signal$1.run(Signal.java:212) > at java.lang.Thread.run(Thread.java:745) > "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 > tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) > at > sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) > at > sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2) > - locked <0x0007bd7788e8> (a > java.util.Collections$UnmodifiableSet) > - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:470) > at > org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) > at java.lang.Thread.run(Thread.java:745) > "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 > waiting on condition [0x72505000] >java.lang.Thread.State: TIMED_WAITING (sleeping) >
[jira] [Created] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup
Shikhar Bhushan created KAFKA-4154: -- Summary: Kafka Connect fails to shutdown if it has not completed startup Key: KAFKA-4154 URL: https://issues.apache.org/jira/browse/KAFKA-4154 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Shikhar Bhushan Fix For: 0.10.1.0 To reproduce: 1. Start Kafka Connect in distributed mode without Kafka running {{./bin/connect-distributed.sh config/connect-distributed.properties}} 2. Ctrl+C fails to terminate the process thread dump: {noformat} Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode): "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on condition [0x73129000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007bd7d91d8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357) at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71) at org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93) "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 nid=0x560f in Object.wait() [0x71a61000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x0007bd63db38> (a org.apache.kafka.connect.runtime.Connect$ShutdownHook) at java.lang.Thread.join(Thread.java:1245) - locked <0x0007bd63db38> (a org.apache.kafka.connect.runtime.Connect$ShutdownHook) at java.lang.Thread.join(Thread.java:1319) at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) at java.lang.Shutdown.runHooks(Shutdown.java:123) at java.lang.Shutdown.sequence(Shutdown.java:167) at java.lang.Shutdown.exit(Shutdown.java:212) - locked <0x0007b0244600> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:745) "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2) - locked <0x0007bd7788e8> (a java.util.Collections$UnmodifiableSet) - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:470) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) at java.lang.Thread.run(Thread.java:745) "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 waiting on condition [0x72505000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:37) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:299) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1310) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:131) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:115) at
[jira] [Reopened] (KAFKA-4183) Logical converters in JsonConverter don't properly handle null values
[ https://issues.apache.org/jira/browse/KAFKA-4183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reopened KAFKA-4183: Assignee: Shikhar Bhushan (was: Ewen Cheslack-Postava) [~rhauch] Reopening this, I noticed an issue with handling default values. E.g. this test {noformat} @Test public void timestampToConnectDefval() { Schema schema = Timestamp.builder().defaultValue(new java.util.Date(42)).schema(); String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"default\": 42 }, \"payload\": null }"; SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes()); assertEquals(schema, schemaAndValue.schema()); assertEquals(new java.util.Date(42), schemaAndValue.value()); } {noformat} Happy to create a followup PR since I'm poking around with it > Logical converters in JsonConverter don't properly handle null values > - > > Key: KAFKA-4183 > URL: https://issues.apache.org/jira/browse/KAFKA-4183 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.1 >Reporter: Randall Hauch >Assignee: Shikhar Bhushan > Fix For: 0.10.1.0 > > > The {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}} map contains > {{LogicalTypeConverter}} implementations to convert from the raw value into > the corresponding logical type value, and they are used during > deserialization of message keys and/or values. However, these implementations > do not handle the case when the input raw value is null, which can happen > when a key or value has a schema that is or contains a field that is > _optional_. > Consider a Kafka Connect schema of type STRUCT that contains a field "date" > with an optional schema of type {{org.apache.kafka.connect.data.Date}}. When > the key or value with this schema contains a null "date" field and is > serialized, the logical serializer properly will serialize the null value. > However, upon _deserialization_, the > {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}} are used to convert the > literal value (which is null) to a logical value. All of the > {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}} implementations will throw a > NullPointerException when the input value is null. > For example: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.json.JsonConverter$14.convert(JsonConverter.java:224) > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:731) > at > org.apache.kafka.connect.json.JsonConverter.access$100(JsonConverter.java:53) > at > org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:200) > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:727) > at > org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:354) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:343) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3906) Connect logical types do not support nulls.
[ https://issues.apache.org/jira/browse/KAFKA-3906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-3906. Resolution: Fixed I think we should handle null values at the converter layer to avoid duplication in logical type impls, as [~ewencp] suggested in the PR. KAFKA-4183 fixes the null handling for logical types in {{JsonConverter}}. > Connect logical types do not support nulls. > --- > > Key: KAFKA-3906 > URL: https://issues.apache.org/jira/browse/KAFKA-3906 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Jeremy Custenborder >Assignee: Ewen Cheslack-Postava > > The logical types for Kafka Connect do not support null data values. Date, > Decimal, Time, and Timestamp all will throw null reference exceptions if a > null is passed in to their fromLogical and toLogical methods. Date, Time, and > Timestamp require signature changes for these methods to support nullable > types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3906) Connect logical types do not support nulls.
[ https://issues.apache.org/jira/browse/KAFKA-3906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15497533#comment-15497533 ] Shikhar Bhushan commented on KAFKA-3906: [~jcustenborder] did this come up in the context of {{JsonConverter}}, and if so can it be closed since KAFKA-4183 patched that? > Connect logical types do not support nulls. > --- > > Key: KAFKA-3906 > URL: https://issues.apache.org/jira/browse/KAFKA-3906 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Jeremy Custenborder >Assignee: Ewen Cheslack-Postava > > The logical types for Kafka Connect do not support null data values. Date, > Decimal, Time, and Timestamp all will throw null reference exceptions if a > null is passed in to their fromLogical and toLogical methods. Date, Time, and > Timestamp require signature changes for these methods to support nullable > types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4161) Allow connectors to request flush via the context
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496820#comment-15496820 ] Shikhar Bhushan commented on KAFKA-4161: We could also implement KAFKA-3462 here by having the semantics that connectors that want to disable offset tracking by Connect can return an empty map from {{flushedOffsets()}}. Maybe {{flushedOffsets()}} isn't the best name - really want a name implying {{commitableOffsets()}}. > Allow connectors to request flush via the context > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4161) Decouple flush and offset commits
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4161: --- Summary: Decouple flush and offset commits (was: Allow connectors to request flush via the context) > Decouple flush and offset commits > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4342) Kafka-connect- support tinyint values
[ https://issues.apache.org/jira/browse/KAFKA-4342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-4342. Resolution: Not A Problem The Connect schema type {{Schema.Type.INT8}} accurately maps to a signed Java {{byte}}. Given the absence of unsigned types in Java, I think we just have to live with that... We can followup on the JDBC connector issue you created https://github.com/confluentinc/kafka-connect-jdbc/pull/152 > Kafka-connect- support tinyint values > - > > Key: KAFKA-4342 > URL: https://issues.apache.org/jira/browse/KAFKA-4342 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao > > We have been using Kafka-connect-jdbc actively for one of our projects and > one of the issues that we have noticed is the way it handles the tinyint > values. > Our database is on mysql and mysql allows both signed and unsigned values to > be stored. So, it can have values going upto 255 but when kafka-connect sees > values beyond 128, it fails. > Reason being, in the ConnectSchema class, the INT8 maps to a Byte which is a > signed value. If we look at the jdbc docs then this is what they say about > handling tinyint values: > https://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html > 8.3.4 TINYINT > The JDBC type TINYINT represents an 8-bit integer value between 0 and 255 > that may be signed or unsigned. > The corresponding SQL type, TINYINT, is currently supported by only a subset > of the major databases. Portable code may therefore prefer to use the JDBC > SMALLINT type, which is widely supported. > The recommended Java mapping for the JDBC TINYINT type is as either a Java > byte or a Java short. The 8-bit Java byte type represents a signed value from > -128 to 127, so it may not always be appropriate for larger TINYINT values, > whereas the 16-bit Java short will always be able to hold all TINYINT values. > I had submitted a PR for this last week. But it failed in the jenkins build > for unrelated test case. So, if someone can take a look at this or suggest > something then it would be great: > https://github.com/apache/kafka/pull/2044 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4356) o.a.k.common.utils.SystemTime.sleep() swallows InterruptedException
Shikhar Bhushan created KAFKA-4356: -- Summary: o.a.k.common.utils.SystemTime.sleep() swallows InterruptedException Key: KAFKA-4356 URL: https://issues.apache.org/jira/browse/KAFKA-4356 Project: Kafka Issue Type: Bug Reporter: Shikhar Bhushan Priority: Minor {{org.apache.kafka.common.utils.SystemTime.sleep()}} catches and ignores {{InterruptedException}}. When doing so normally the interruption state should still be restored with {{Thread.currentThread().interrupt()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder
[ https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reassigned KAFKA-3910: -- Assignee: Shikhar Bhushan (was: Ewen Cheslack-Postava) > Cyclic schema support in ConnectSchema and SchemaBuilder > > > Key: KAFKA-3910 > URL: https://issues.apache.org/jira/browse/KAFKA-3910 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: John Hofman >Assignee: Shikhar Bhushan > > Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. > Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack > overflow when converting a cyclic avro schema, e.g: > {code} > {"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]} > {code} > This is a blocking issue for all connectors running on the connect framework > with data containing cyclic references. The AvroConverter cannot support > cyclic schema's until the underlying ConnectSchema and SchemaBuilder do. > To reproduce the stack-overflow (Confluent-3.0.0): > Produce some cyclic data: > {code} > bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test > --property value.schema='{"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}' > {"value":1,"next":null} > {"value":1,"next":{"list":{"value":2,"next":null}}} > {code} > Then try to consume it with connect: > {code:title=connect-console-sink.properties} > name=local-console-sink > connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector > tasks.max=1 > topics=test > {code} > {code} > ./bin/connect-standalone > ./etc/schema-registry/connect-avro-standalone.properties > connect-console-sink.properties > … start up logging … > java.lang.StackOverflowError > at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) > at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4161) Decouple flush and offset commits
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4161 started by Shikhar Bhushan. -- > Decouple flush and offset commits > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4161) Decouple flush and offset commits
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637718#comment-15637718 ] Shikhar Bhushan commented on KAFKA-4161: Created KIP-89 for this https://cwiki.apache.org/confluence/display/KAFKA/KIP-89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit > Decouple flush and offset commits > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4161) Decouple flush and offset commits
[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4161: --- Issue Type: Improvement (was: New Feature) > Decouple flush and offset commits > - > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3462) Allow SinkTasks to disable consumer offset commit
[ https://issues.apache.org/jira/browse/KAFKA-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-3462: --- Issue Type: Improvement (was: Bug) > Allow SinkTasks to disable consumer offset commit > -- > > Key: KAFKA-3462 > URL: https://issues.apache.org/jira/browse/KAFKA-3462 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Liquan Pei >Assignee: Liquan Pei >Priority: Minor > Fix For: 0.10.2.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > SinkTasks should be able to disable consumer offset commit if they manage > offsets in the sink data store rather than using Kafka consumer offsets. For > example, an HDFS connector might record offsets in HDFS to provide exactly > once delivery. When the SinkTask is started or a rebalance occurs, the task > would reload offsets from HDFS. In this case, disabling consumer offset > commit will save some CPU cycles and network IOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4306) Connect workers won't shut down if brokers are not available
[ https://issues.apache.org/jira/browse/KAFKA-4306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586568#comment-15586568 ] Shikhar Bhushan commented on KAFKA-4306: KAFKA-4154 is another issue relating to broker-unavailability preventing shutdown, but I believe there it's because it failed to finish startup in the first place. > Connect workers won't shut down if brokers are not available > > > Key: KAFKA-4306 > URL: https://issues.apache.org/jira/browse/KAFKA-4306 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Gwen Shapira >Assignee: Ewen Cheslack-Postava > > If brokers are not available and we try to shut down connect workers, sink > connectors will be stuck in a loop retrying to commit offsets: > 2016-10-17 09:39:14,907] INFO Marking the coordinator 192.168.1.9:9092 (id: > 2147483647 rack: null) dead for group connect-dump-kafka-config1 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:600) > [2016-10-17 09:39:14,907] ERROR Commit of > WorkerSinkTask{id=dump-kafka-config1-0} offsets threw an unexpected > exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:194) > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing offsets. > Caused by: > org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException > We should probably limit the number of retries before doing "unclean" > shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3462) Allow SinkTasks to disable consumer offset commit
[ https://issues.apache.org/jira/browse/KAFKA-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-3462: --- Resolution: Fixed Status: Resolved (was: Patch Available) This will be handled with KIP-89 / KAFKA-4161. Tasks that wish to disable framework-managed offset commits can return an empty map from {{SinkTask.preCommit()}} to make it a no-op. > Allow SinkTasks to disable consumer offset commit > -- > > Key: KAFKA-3462 > URL: https://issues.apache.org/jira/browse/KAFKA-3462 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Liquan Pei >Assignee: Liquan Pei >Priority: Minor > Fix For: 0.10.2.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > SinkTasks should be able to disable consumer offset commit if they manage > offsets in the sink data store rather than using Kafka consumer offsets. For > example, an HDFS connector might record offsets in HDFS to provide exactly > once delivery. When the SinkTask is started or a rebalance occurs, the task > would reload offsets from HDFS. In this case, disabling consumer offset > commit will save some CPU cycles and network IOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4375) Kafka consumer may swallow some interrupts meant for the calling thread
[ https://issues.apache.org/jira/browse/KAFKA-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634044#comment-15634044 ] Shikhar Bhushan commented on KAFKA-4375: Good to have a report of this being a problem, I opened KAFKA-4356 recently after chancing on the code. It seems like an oversight rather than by design. > Kafka consumer may swallow some interrupts meant for the calling thread > --- > > Key: KAFKA-4375 > URL: https://issues.apache.org/jira/browse/KAFKA-4375 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Stig Rohde Døssing > > Apache Storm has added a new data source ("spout") based on the Kafka 0.9 > consumer. Storm interacts with the consumer by having one thread per spout > instance loop calls to poll/commitSync etc. When Storm shuts down, another > thread indicates that the looping threads should shut down by interrupting > them, and joining them. > If one of the looping threads happen to be interrupted while executing > certain sleeps in some consumer methods (commitSync and committed at least), > the interrupt can be lost because they contain a call to SystemTime.sleep, > which swallows the interrupt. > Is this behavior by design, or can SystemTime be changed to reset the thread > interrupt flag when catching an InterruptedException? > I haven't checked the rest of the client code, so it's possible that this is > an issue in other parts of the code too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4356) o.a.k.common.utils.SystemTime.sleep() swallows InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-4356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan resolved KAFKA-4356. Resolution: Duplicate > o.a.k.common.utils.SystemTime.sleep() swallows InterruptedException > --- > > Key: KAFKA-4356 > URL: https://issues.apache.org/jira/browse/KAFKA-4356 > Project: Kafka > Issue Type: Bug >Reporter: Shikhar Bhushan >Priority: Minor > > {{org.apache.kafka.common.utils.SystemTime.sleep()}} catches and ignores > {{InterruptedException}}. When doing so normally the interruption state > should still be restored with {{Thread.currentThread().interrupt()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3209) Support single message transforms in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15729737#comment-15729737 ] Shikhar Bhushan commented on KAFKA-3209: [~snisarg] and [~jjchorrobe], I revived the discussion thread and I'd welcome your thoughts on there about this proposal: https://cwiki.apache.org/confluence/display/KAFKA/Connect+Transforms+-+Proposed+Design > Support single message transforms in Kafka Connect > -- > > Key: KAFKA-3209 > URL: https://issues.apache.org/jira/browse/KAFKA-3209 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Neha Narkhede > Labels: needs-kip > Fix For: 0.10.2.0 > > > Users should be able to perform light transformations on messages between a > connector and Kafka. This is needed because some transformations must be > performed before the data hits Kafka (e.g. filtering certain types of events > or PII filtering). It's also useful for very light, single-message > modifications that are easier to perform inline with the data import/export. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3209) Support single message transforms in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743300#comment-15743300 ] Shikhar Bhushan commented on KAFKA-3209: Thanks [~snisarg]. I self-assigned it as I don't believe you were actively working on it and the ticket was unassigned, but I'd be happy to collaborate if you have time. Yes, I think we should continue the work by updating KIP-66. I'm working on drafting the proposal into KIP form and I'll send a ML update when it's ready. > Support single message transforms in Kafka Connect > -- > > Key: KAFKA-3209 > URL: https://issues.apache.org/jira/browse/KAFKA-3209 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Neha Narkhede >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.2.0 > > > Users should be able to perform light transformations on messages between a > connector and Kafka. This is needed because some transformations must be > performed before the data hits Kafka (e.g. filtering certain types of events > or PII filtering). It's also useful for very light, single-message > modifications that are easier to perform inline with the data import/export. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4524) ConfigDef.Type.LIST does not handle escaping
Shikhar Bhushan created KAFKA-4524: -- Summary: ConfigDef.Type.LIST does not handle escaping Key: KAFKA-4524 URL: https://issues.apache.org/jira/browse/KAFKA-4524 Project: Kafka Issue Type: Bug Reporter: Shikhar Bhushan {{ConfigDef.Type.LIST}} expects a CSV list, but does not handle escaping. It is not possible to provide values containing commas. We should probably adopt the semi-standard way of escaping CSV as in https://tools.ietf.org/html/rfc4180 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3209) Support single message transforms in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reassigned KAFKA-3209: -- Assignee: Shikhar Bhushan > Support single message transforms in Kafka Connect > -- > > Key: KAFKA-3209 > URL: https://issues.apache.org/jira/browse/KAFKA-3209 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Neha Narkhede >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.2.0 > > > Users should be able to perform light transformations on messages between a > connector and Kafka. This is needed because some transformations must be > performed before the data hits Kafka (e.g. filtering certain types of events > or PII filtering). It's also useful for very light, single-message > modifications that are easier to perform inline with the data import/export. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4598) Create new SourceTask commit callback method that takes offsets param
[ https://issues.apache.org/jira/browse/KAFKA-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802110#comment-15802110 ] Shikhar Bhushan commented on KAFKA-4598: In the meantime the workaround is to use {{SourceTask.commitRecord(SourceRecord)}} to keep track of committable offset state. > Create new SourceTask commit callback method that takes offsets param > - > > Key: KAFKA-4598 > URL: https://issues.apache.org/jira/browse/KAFKA-4598 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > > {{SourceTask.commit()}} can be invoked concurrently with a > {{SourceTask.poll()}} in progress. Thus it is currently not possible to know > what offset state the commit call corresponds to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4598) Create new SourceTask commit callback method that takes offsets param
Shikhar Bhushan created KAFKA-4598: -- Summary: Create new SourceTask commit callback method that takes offsets param Key: KAFKA-4598 URL: https://issues.apache.org/jira/browse/KAFKA-4598 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Shikhar Bhushan Assignee: Ewen Cheslack-Postava {{SourceTask.commit()}} can be invoked concurrently with a {{SourceTask.poll()}} in progress. Thus it is currently not possible to know what offset state the commit call corresponds to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4598) Create new SourceTask commit callback method that takes offsets param
[ https://issues.apache.org/jira/browse/KAFKA-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805460#comment-15805460 ] Shikhar Bhushan commented on KAFKA-4598: Yeah, that's a reasonable alternative with the caveat you pointed out. > Create new SourceTask commit callback method that takes offsets param > - > > Key: KAFKA-4598 > URL: https://issues.apache.org/jira/browse/KAFKA-4598 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Ewen Cheslack-Postava > > {{SourceTask.commit()}} can be invoked concurrently with a > {{SourceTask.poll()}} in progress. Thus it is currently not possible to know > what offset state the commit call corresponds to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3513) Transient failure of OffsetValidationTest
[ https://issues.apache.org/jira/browse/KAFKA-3513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15791790#comment-15791790 ] Shikhar Bhushan commented on KAFKA-3513: There was a failure in the last run http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-01-01--001.1483262815--apache--trunk--3d7e884/report.html {noformat} Module: kafkatest.tests.client.consumer_test Class: OffsetValidationTest Method: test_broker_failure Arguments: { "clean_shutdown": true, "enable_autocommit": true } {noformat} > Transient failure of OffsetValidationTest > - > > Key: KAFKA-3513 > URL: https://issues.apache.org/jira/browse/KAFKA-3513 > Project: Kafka > Issue Type: Bug > Components: consumer, system tests >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-04-05--001.1459840046--apache--trunk--31e263e/report.html > The version of the test fails in this case is: > Module: kafkatest.tests.client.consumer_test > Class: OffsetValidationTest > Method: test_broker_failure > Arguments: > { > "clean_shutdown": true, > "enable_autocommit": false > } > and others passed. It's unclear if the parameters actually have any impact on > the failure. > I did some initial triage and it looks like the test code isn't seeing all > the group members join the group (receive partition assignments), but it > appears from the logs that they all did. This could indicate a simple timing > issue, but I haven't been able to verify that yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector
[ https://issues.apache.org/jira/browse/KAFKA-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4575 started by Shikhar Bhushan. -- > Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in > consuming messages after resuming source connector > > > Key: KAFKA-4575 > URL: https://issues.apache.org/jira/browse/KAFKA-4575 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, system tests >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html > {noformat} > [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: > Summary: Failed to consume messages after resuming source connector > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 267, in test_pause_and_resume_sink > err_msg="Failed to consume messages after resuming source connector") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Failed to consume messages after resuming source connector > {noformat} > We recently fixed KAFKA-4527 and this is a new kind of failure in the same > test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector
[ https://issues.apache.org/jira/browse/KAFKA-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799807#comment-15799807 ] Shikhar Bhushan commented on KAFKA-4575: By the way, the error message is misleading, it should read 'after resuming _sink_ connector'. > Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in > consuming messages after resuming source connector > > > Key: KAFKA-4575 > URL: https://issues.apache.org/jira/browse/KAFKA-4575 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, system tests >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html > {noformat} > [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: > Summary: Failed to consume messages after resuming source connector > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 267, in test_pause_and_resume_sink > err_msg="Failed to consume messages after resuming source connector") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Failed to consume messages after resuming source connector > {noformat} > We recently fixed KAFKA-4527 and this is a new kind of failure in the same > test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming sink connector
[ https://issues.apache.org/jira/browse/KAFKA-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4575: --- Summary: Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming sink connector (was: Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector) > Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in > consuming messages after resuming sink connector > -- > > Key: KAFKA-4575 > URL: https://issues.apache.org/jira/browse/KAFKA-4575 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, system tests >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html > {noformat} > [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: > Summary: Failed to consume messages after resuming source connector > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 267, in test_pause_and_resume_sink > err_msg="Failed to consume messages after resuming source connector") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Failed to consume messages after resuming source connector > {noformat} > We recently fixed KAFKA-4527 and this is a new kind of failure in the same > test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector
[ https://issues.apache.org/jira/browse/KAFKA-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan updated KAFKA-4575: --- Component/s: system tests KafkaConnect > Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in > consuming messages after resuming source connector > > > Key: KAFKA-4575 > URL: https://issues.apache.org/jira/browse/KAFKA-4575 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, system tests >Reporter: Shikhar Bhushan > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html > {noformat} > [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: > Summary: Failed to consume messages after resuming source connector > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 267, in test_pause_and_resume_sink > err_msg="Failed to consume messages after resuming source connector") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Failed to consume messages after resuming source connector > {noformat} > We recently fixed KAFKA-4527 and this is a new kind of failure in the same > test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector
[ https://issues.apache.org/jira/browse/KAFKA-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shikhar Bhushan reassigned KAFKA-4575: -- Assignee: Shikhar Bhushan > Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in > consuming messages after resuming source connector > > > Key: KAFKA-4575 > URL: https://issues.apache.org/jira/browse/KAFKA-4575 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, system tests >Reporter: Shikhar Bhushan >Assignee: Shikhar Bhushan > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html > {noformat} > [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: > Summary: Failed to consume messages after resuming source connector > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 267, in test_pause_and_resume_sink > err_msg="Failed to consume messages after resuming source connector") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Failed to consume messages after resuming source connector > {noformat} > We recently fixed KAFKA-4527 and this is a new kind of failure in the same > test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4575) Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector
Shikhar Bhushan created KAFKA-4575: -- Summary: Transient failure in ConnectDistributedTest.test_pause_and_resume_sink in consuming messages after resuming source connector Key: KAFKA-4575 URL: https://issues.apache.org/jira/browse/KAFKA-4575 Project: Kafka Issue Type: Test Reporter: Shikhar Bhushan http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html {noformat} [INFO - 2016-12-29 08:56:23,050 - runner_client - log - lineno:221]: RunnerClient: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_pause_and_resume_sink: Summary: Failed to consume messages after resuming source connector Traceback (most recent call last): File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 123, in run data = self.run_test() File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 176, in run_test return self.test_context.function(self.test) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 267, in test_pause_and_resume_sink err_msg="Failed to consume messages after resuming source connector") File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", line 36, in wait_until raise TimeoutError(err_msg) TimeoutError: Failed to consume messages after resuming source connector {noformat} We recently fixed KAFKA-4527 and this is a new kind of failure in the same test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4574) Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade with security_protocol = SASL_PLAINTEXT, SSL
Shikhar Bhushan created KAFKA-4574: -- Summary: Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade with security_protocol = SASL_PLAINTEXT, SSL Key: KAFKA-4574 URL: https://issues.apache.org/jira/browse/KAFKA-4574 Project: Kafka Issue Type: Test Components: system tests Reporter: Shikhar Bhushan http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html {{ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade}} failed with these {{security_protocol}} parameters {noformat} test_id: kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SASL_PLAINTEXT status: FAIL run time: 3 minutes 44.094 seconds 1 acked message did not make it to the Consumer. They are: [5076]. We validated that the first 1 of these missing messages correctly made it into Kafka's data files. This suggests they were lost on their way to the consumer. Traceback (most recent call last): File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 123, in run data = self.run_test() File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 176, in run_test return self.test_context.function(self.test) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", line 321, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", line 117, in test_zk_security_upgrade self.run_produce_consume_validate(self.run_zk_migration) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 101, in run_produce_consume_validate self.validate() File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 163, in validate assert success, msg AssertionError: 1 acked message did not make it to the Consumer. They are: [5076]. We validated that the first 1 of these missing messages correctly made it into Kafka's data files. This suggests they were lost on their way to the consumer. {noformat} {noformat} test_id: kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL status: FAIL run time: 3 minutes 50.578 seconds 1 acked message did not make it to the Consumer. They are: [3559]. We validated that the first 1 of these missing messages correctly made it into Kafka's data files. This suggests they were lost on their way to the consumer. Traceback (most recent call last): File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 123, in run data = self.run_test() File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 176, in run_test return self.test_context.function(self.test) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", line 321, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", line 117, in test_zk_security_upgrade self.run_produce_consume_validate(self.run_zk_migration) File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 101, in run_produce_consume_validate self.validate() File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 163, in validate assert success, msg AssertionError: 1 acked message did not make it to the Consumer. They are: [3559]. We validated that the first 1 of these missing messages correctly made it into Kafka's data files. This suggests they were lost on their way to the consumer. {noformat} Previously: KAFKA-3985 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3209) Support single message transforms in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3209 started by Shikhar Bhushan. -- > Support single message transforms in Kafka Connect > -- > > Key: KAFKA-3209 > URL: https://issues.apache.org/jira/browse/KAFKA-3209 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Neha Narkhede >Assignee: Shikhar Bhushan > Labels: needs-kip > Fix For: 0.10.2.0 > > > Users should be able to perform light transformations on messages between a > connector and Kafka. This is needed because some transformations must be > performed before the data hits Kafka (e.g. filtering certain types of events > or PII filtering). It's also useful for very light, single-message > modifications that are easier to perform inline with the data import/export. -- This message was sent by Atlassian JIRA (v6.3.4#6332)