[jira] [Comment Edited] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-07-16 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196
 ] 

Oleg Kuznetsov edited comment on KAFKA-4794 at 7/17/17 12:07 AM:
-

[~jasong35] [~ewencp] Do you have any feedback regarding this KIP?


was (Author: olkuznsmith):
[~jasong35] Do you have any feedback regarding this KIP?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-07-16 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

[~jasong35] Do you have any feedback regarding this KIP?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext

2017-07-16 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-5598:
--
Description: It is useful to read offset in SourceConnector, but now 
OffsetReader is accessible only in SourceTaskContext.  (was: It is useful to 
read offset in SourceConnector, but now OffsetReader is accessible only in 
SourceTask.)

> Make OffsetReader accessible in ConnectorContext
> 
>
> Key: KAFKA-5598
> URL: https://issues.apache.org/jira/browse/KAFKA-5598
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Oleg Kuznetsov
>
> It is useful to read offset in SourceConnector, but now OffsetReader is 
> accessible only in SourceTaskContext.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext

2017-07-16 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-5598:
-

 Summary: Make OffsetReader accessible in ConnectorContext
 Key: KAFKA-5598
 URL: https://issues.apache.org/jira/browse/KAFKA-5598
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Oleg Kuznetsov


It is useful to read offset in SourceConnector, but now OffsetReader is 
accessible only in SourceTask.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress

2017-07-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089143#comment-16089143
 ] 

ASF GitHub Bot commented on KAFKA-5595:
---

GitHub user rajinisivaram reopened a pull request:

https://github.com/apache/kafka/pull/3530

KAFKA-5595: Ensure client connection ids are not reused too quickly

When there are broker delays that cause a response to take longer than 
`connections.max.idle.ms`, connections may be closed by the broker (as well as 
by the client) before the response is processed.
If the port is reused, broker may send the outstanding response to a new 
connection with the reused port. The new connection will end up with 
correlation id mismatch, requiring process restart. This is also a security 
exposure since clients receive response intended for the wrong connection.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-5595

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3530


commit 99f3374b5c3b0cb5efcdfe4c553aac943297b016
Author: Rajini Sivaram 
Date:   2017-07-15T13:01:47Z

KAFKA-5595: Ensure client connection ids are not reused too quickly




> Illegal state in SocketServer; attempt to send with another send in progress
> 
>
> Key: KAFKA-5595
> URL: https://issues.apache.org/jira/browse/KAFKA-5595
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> I have seen this a couple times, but I'm not sure the conditions associated 
> with it. 
> {code}
> java.lang.IllegalStateException: Attempt to begin a send operation with prior 
> send operation still in progress.
>   at 
> org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138)
>   at org.apache.kafka.common.network.Selector.send(Selector.java:248)
>   at kafka.network.Processor.sendResponse(SocketServer.scala:488)
>   at kafka.network.Processor.processNewResponses(SocketServer.scala:466)
>   at kafka.network.Processor.run(SocketServer.scala:431)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Prior to this event, I see a lot of this message in the logs (always for the 
> same connection id):
> {code}
> Attempting to send response via channel for which there is no open 
> connection, connection id 7
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress

2017-07-16 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089137#comment-16089137
 ] 

Rajini Sivaram commented on KAFKA-5595:
---

I think this issue can occur when idle connections are closed by the broker. 
This would cause the channel to be removed when responses are still pending. 
And if ports are reused, it can result in the IllegalStateException.

> Illegal state in SocketServer; attempt to send with another send in progress
> 
>
> Key: KAFKA-5595
> URL: https://issues.apache.org/jira/browse/KAFKA-5595
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> I have seen this a couple times, but I'm not sure the conditions associated 
> with it. 
> {code}
> java.lang.IllegalStateException: Attempt to begin a send operation with prior 
> send operation still in progress.
>   at 
> org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138)
>   at org.apache.kafka.common.network.Selector.send(Selector.java:248)
>   at kafka.network.Processor.sendResponse(SocketServer.scala:488)
>   at kafka.network.Processor.processNewResponses(SocketServer.scala:466)
>   at kafka.network.Processor.run(SocketServer.scala:431)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Prior to this event, I see a lot of this message in the logs (always for the 
> same connection id):
> {code}
> Attempting to send response via channel for which there is no open 
> connection, connection id 7
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress

2017-07-16 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089098#comment-16089098
 ] 

Rajini Sivaram commented on KAFKA-5595:
---

[~ijuma] I dont think the scenario that you described can occur. Broker mutes 
connections when requests are processed. Even if client disconnected, the 
disconnection would be processed only when the response was ready to be sent. 
And response is sent before any disconnections are processed.

> Illegal state in SocketServer; attempt to send with another send in progress
> 
>
> Key: KAFKA-5595
> URL: https://issues.apache.org/jira/browse/KAFKA-5595
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> I have seen this a couple times, but I'm not sure the conditions associated 
> with it. 
> {code}
> java.lang.IllegalStateException: Attempt to begin a send operation with prior 
> send operation still in progress.
>   at 
> org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138)
>   at org.apache.kafka.common.network.Selector.send(Selector.java:248)
>   at kafka.network.Processor.sendResponse(SocketServer.scala:488)
>   at kafka.network.Processor.processNewResponses(SocketServer.scala:466)
>   at kafka.network.Processor.run(SocketServer.scala:431)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Prior to this event, I see a lot of this message in the logs (always for the 
> same connection id):
> {code}
> Attempting to send response via channel for which there is no open 
> connection, connection id 7
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5595) Illegal state in SocketServer; attempt to send with another send in progress

2017-07-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089093#comment-16089093
 ] 

ASF GitHub Bot commented on KAFKA-5595:
---

Github user rajinisivaram closed the pull request at:

https://github.com/apache/kafka/pull/3530


> Illegal state in SocketServer; attempt to send with another send in progress
> 
>
> Key: KAFKA-5595
> URL: https://issues.apache.org/jira/browse/KAFKA-5595
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> I have seen this a couple times, but I'm not sure the conditions associated 
> with it. 
> {code}
> java.lang.IllegalStateException: Attempt to begin a send operation with prior 
> send operation still in progress.
>   at 
> org.apache.kafka.common.network.KafkaChannel.setSend(KafkaChannel.java:138)
>   at org.apache.kafka.common.network.Selector.send(Selector.java:248)
>   at kafka.network.Processor.sendResponse(SocketServer.scala:488)
>   at kafka.network.Processor.processNewResponses(SocketServer.scala:466)
>   at kafka.network.Processor.run(SocketServer.scala:431)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Prior to this event, I see a lot of this message in the logs (always for the 
> same connection id):
> {code}
> Attempting to send response via channel for which there is no open 
> connection, connection id 7
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3834) Consumer should not block in poll on coordinator discovery

2017-07-16 Thread Abhishek Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088866#comment-16088866
 ] 

Abhishek Agarwal commented on KAFKA-3834:
-

[~hachikuji] If the ips are misconfigured or the broker is not available, then 
group_coordinator_request will never be sent to that node. Isn't it? Won't the 
lookupCoordinator() call itself fail, before the client blocks itself in 
awaitMetadataUpdate()

> Consumer should not block in poll on coordinator discovery
> --
>
> Key: KAFKA-3834
> URL: https://issues.apache.org/jira/browse/KAFKA-3834
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> Currently we block indefinitely in poll() when discovering the coordinator 
> for the group. Instead, we can return an empty record set when the passed 
> timeout expires. The downside is that it may obscure the underlying problem 
> (which is usually misconfiguration), but users typically have to look at the 
> logs to figure out the problem anyway. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5597) Autogenerate Producer sender metrics

2017-07-16 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng reassigned KAFKA-5597:
--

Assignee: James Cheng

> Autogenerate Producer sender metrics
> 
>
> Key: KAFKA-5597
> URL: https://issues.apache.org/jira/browse/KAFKA-5597
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5597) Autogenerate Producer sender metrics

2017-07-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088851#comment-16088851
 ] 

ASF GitHub Bot commented on KAFKA-5597:
---

GitHub user wushujames opened a pull request:

https://github.com/apache/kafka/pull/3535

KAFKA-5597: Autogenerate producer sender metrics.

Subtask of https://issues.apache.org/jira/browse/KAFKA-3480

The changes are very similar to what was done for the consumer in 
https://issues.apache.org/jira/browse/KAFKA-5191 (pull request 
https://github.com/apache/kafka/pull/2993)

A screenshot of the docs are here:
![producer metrics 
docs](https://user-images.githubusercontent.com/677529/28245950-96b0c20a-69c6-11e7-8631-11fd55a0ad92.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wushujames/kafka producer_sender_metrics_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3535.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3535


commit 33f876fa471048ec6ddd7c05dbd5935d0157eca3
Author: James Cheng 
Date:   2017-07-16T08:29:30Z

Autogenerate producer sender metrics.




> Autogenerate Producer sender metrics
> 
>
> Key: KAFKA-5597
> URL: https://issues.apache.org/jira/browse/KAFKA-5597
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5597) Autogenerate Producer sender metrics

2017-07-16 Thread James Cheng (JIRA)
James Cheng created KAFKA-5597:
--

 Summary: Autogenerate Producer sender metrics
 Key: KAFKA-5597
 URL: https://issues.apache.org/jira/browse/KAFKA-5597
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Cheng






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2017-07-16 Thread Evgeny Veretennikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgeny Veretennikov reassigned KAFKA-3539:
--

Assignee: (was: Evgeny Veretennikov)

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>Reporter: Oleg Zhurakousky
>Priority: Critical
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2017-07-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088823#comment-16088823
 ] 

ASF GitHub Bot commented on KAFKA-3539:
---

Github user evis closed the pull request at:

https://github.com/apache/kafka/pull/3478


> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>Reporter: Oleg Zhurakousky
>Assignee: Evgeny Veretennikov
>Priority: Critical
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)