[jira] [Comment Edited] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196 ] Oleg Kuznetsov edited comment on KAFKA-4794 at 7/17/17 12:07 AM: - [~jasong35] [~ewencp] Do you have any feedback regarding this KIP? was (Author: olkuznsmith): [~jasong35] Do you have any feedback regarding this KIP? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196 ] Oleg Kuznetsov commented on KAFKA-4794: --- [~jasong35] Do you have any feedback regarding this KIP? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext
[ https://issues.apache.org/jira/browse/KAFKA-5598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-5598: -- Description: It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTaskContext. (was: It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTask.) > Make OffsetReader accessible in ConnectorContext > > > Key: KAFKA-5598 > URL: https://issues.apache.org/jira/browse/KAFKA-5598 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Oleg Kuznetsov > > It is useful to read offset in SourceConnector, but now OffsetReader is > accessible only in SourceTaskContext. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext
Oleg Kuznetsov created KAFKA-5598: - Summary: Make OffsetReader accessible in ConnectorContext Key: KAFKA-5598 URL: https://issues.apache.org/jira/browse/KAFKA-5598 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.11.0.0 Reporter: Oleg Kuznetsov It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTask. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress
[ https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089143#comment-16089143 ] ASF GitHub Bot commented on KAFKA-5595: --- GitHub user rajinisivaram reopened a pull request: https://github.com/apache/kafka/pull/3530 KAFKA-5595: Ensure client connection ids are not reused too quickly When there are broker delays that cause a response to take longer than `connections.max.idle.ms`, connections may be closed by the broker (as well as by the client) before the response is processed. If the port is reused, broker may send the outstanding response to a new connection with the reused port. The new connection will end up with correlation id mismatch, requiring process restart. This is also a security exposure since clients receive response intended for the wrong connection. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rajinisivaram/kafka KAFKA-5595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3530.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3530 commit 99f3374b5c3b0cb5efcdfe4c553aac943297b016 Author: Rajini SivaramDate: 2017-07-15T13:01:47Z KAFKA-5595: Ensure client connection ids are not reused too quickly > Illegal state in SocketServer; attempt to send with another send in progress > > > Key: KAFKA-5595 > URL: https://issues.apache.org/jira/browse/KAFKA-5595 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > > I have seen this a couple times, but I'm not sure the conditions associated > with it. > {code} > java.lang.IllegalStateException: Attempt to begin a send operation with prior > send operation still in progress. > at > org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138) > at org.apache.kafka.common.network.Selector.send(Selector.java:248) > at kafka.network.Processor.sendResponse(SocketServer.scala:488) > at kafka.network.Processor.processNewResponses(SocketServer.scala:466) > at kafka.network.Processor.run(SocketServer.scala:431) > at java.lang.Thread.run(Thread.java:748) > {code} > Prior to this event, I see a lot of this message in the logs (always for the > same connection id): > {code} > Attempting to send response via channel for which there is no open > connection, connection id 7 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress
[ https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089137#comment-16089137 ] Rajini Sivaram commented on KAFKA-5595: --- I think this issue can occur when idle connections are closed by the broker. This would cause the channel to be removed when responses are still pending. And if ports are reused, it can result in the IllegalStateException. > Illegal state in SocketServer; attempt to send with another send in progress > > > Key: KAFKA-5595 > URL: https://issues.apache.org/jira/browse/KAFKA-5595 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > > I have seen this a couple times, but I'm not sure the conditions associated > with it. > {code} > java.lang.IllegalStateException: Attempt to begin a send operation with prior > send operation still in progress. > at > org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138) > at org.apache.kafka.common.network.Selector.send(Selector.java:248) > at kafka.network.Processor.sendResponse(SocketServer.scala:488) > at kafka.network.Processor.processNewResponses(SocketServer.scala:466) > at kafka.network.Processor.run(SocketServer.scala:431) > at java.lang.Thread.run(Thread.java:748) > {code} > Prior to this event, I see a lot of this message in the logs (always for the > same connection id): > {code} > Attempting to send response via channel for which there is no open > connection, connection id 7 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress
[ https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089098#comment-16089098 ] Rajini Sivaram commented on KAFKA-5595: --- [~ijuma] I dont think the scenario that you described can occur. Broker mutes connections when requests are processed. Even if client disconnected, the disconnection would be processed only when the response was ready to be sent. And response is sent before any disconnections are processed. > Illegal state in SocketServer; attempt to send with another send in progress > > > Key: KAFKA-5595 > URL: https://issues.apache.org/jira/browse/KAFKA-5595 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > > I have seen this a couple times, but I'm not sure the conditions associated > with it. > {code} > java.lang.IllegalStateException: Attempt to begin a send operation with prior > send operation still in progress. > at > org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138) > at org.apache.kafka.common.network.Selector.send(Selector.java:248) > at kafka.network.Processor.sendResponse(SocketServer.scala:488) > at kafka.network.Processor.processNewResponses(SocketServer.scala:466) > at kafka.network.Processor.run(SocketServer.scala:431) > at java.lang.Thread.run(Thread.java:748) > {code} > Prior to this event, I see a lot of this message in the logs (always for the > same connection id): > {code} > Attempting to send response via channel for which there is no open > connection, connection id 7 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress
[ https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089093#comment-16089093 ] ASF GitHub Bot commented on KAFKA-5595: --- Github user rajinisivaram closed the pull request at: https://github.com/apache/kafka/pull/3530 > Illegal state in SocketServer; attempt to send with another send in progress > > > Key: KAFKA-5595 > URL: https://issues.apache.org/jira/browse/KAFKA-5595 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > > I have seen this a couple times, but I'm not sure the conditions associated > with it. > {code} > java.lang.IllegalStateException: Attempt to begin a send operation with prior > send operation still in progress. > at > org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138) > at org.apache.kafka.common.network.Selector.send(Selector.java:248) > at kafka.network.Processor.sendResponse(SocketServer.scala:488) > at kafka.network.Processor.processNewResponses(SocketServer.scala:466) > at kafka.network.Processor.run(SocketServer.scala:431) > at java.lang.Thread.run(Thread.java:748) > {code} > Prior to this event, I see a lot of this message in the logs (always for the > same connection id): > {code} > Attempting to send response via channel for which there is no open > connection, connection id 7 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3834) Consumer should not block in poll on coordinator discovery
[ https://issues.apache.org/jira/browse/KAFKA-3834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088866#comment-16088866 ] Abhishek Agarwal commented on KAFKA-3834: - [~hachikuji] If the ips are misconfigured or the broker is not available, then group_coordinator_request will never be sent to that node. Isn't it? Won't the lookupCoordinator() call itself fail, before the client blocks itself in awaitMetadataUpdate() > Consumer should not block in poll on coordinator discovery > -- > > Key: KAFKA-3834 > URL: https://issues.apache.org/jira/browse/KAFKA-3834 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > Currently we block indefinitely in poll() when discovering the coordinator > for the group. Instead, we can return an empty record set when the passed > timeout expires. The downside is that it may obscure the underlying problem > (which is usually misconfiguration), but users typically have to look at the > logs to figure out the problem anyway. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5597) Autogenerate Producer sender metrics
[ https://issues.apache.org/jira/browse/KAFKA-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Cheng reassigned KAFKA-5597: -- Assignee: James Cheng > Autogenerate Producer sender metrics > > > Key: KAFKA-5597 > URL: https://issues.apache.org/jira/browse/KAFKA-5597 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5597) Autogenerate Producer sender metrics
[ https://issues.apache.org/jira/browse/KAFKA-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088851#comment-16088851 ] ASF GitHub Bot commented on KAFKA-5597: --- GitHub user wushujames opened a pull request: https://github.com/apache/kafka/pull/3535 KAFKA-5597: Autogenerate producer sender metrics. Subtask of https://issues.apache.org/jira/browse/KAFKA-3480 The changes are very similar to what was done for the consumer in https://issues.apache.org/jira/browse/KAFKA-5191 (pull request https://github.com/apache/kafka/pull/2993) A screenshot of the docs are here: ![producer metrics docs](https://user-images.githubusercontent.com/677529/28245950-96b0c20a-69c6-11e7-8631-11fd55a0ad92.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/wushujames/kafka producer_sender_metrics_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3535.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3535 commit 33f876fa471048ec6ddd7c05dbd5935d0157eca3 Author: James ChengDate: 2017-07-16T08:29:30Z Autogenerate producer sender metrics. > Autogenerate Producer sender metrics > > > Key: KAFKA-5597 > URL: https://issues.apache.org/jira/browse/KAFKA-5597 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5597) Autogenerate Producer sender metrics
James Cheng created KAFKA-5597: -- Summary: Autogenerate Producer sender metrics Key: KAFKA-5597 URL: https://issues.apache.org/jira/browse/KAFKA-5597 Project: Kafka Issue Type: Sub-task Reporter: James Cheng -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgeny Veretennikov reassigned KAFKA-3539: -- Assignee: (was: Evgeny Veretennikov) > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug >Reporter: Oleg Zhurakousky >Priority: Critical > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088823#comment-16088823 ] ASF GitHub Bot commented on KAFKA-3539: --- Github user evis closed the pull request at: https://github.com/apache/kafka/pull/3478 > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug >Reporter: Oleg Zhurakousky >Assignee: Evgeny Veretennikov >Priority: Critical > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)