[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611104#comment-14611104 ] Guozhang Wang commented on KAFKA-2168: -- Committed the follow-up patch to trunk. Closing. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, > KAFKA-2168_2015-06-23_09:39:07.patch, KAFKA-2168_2015-06-30_10:54:22.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608753#comment-14608753 ] Jason Gustafson commented on KAFKA-2168: [~ewencp], there were some minor issues from the code reviews that I've tried to address in the recent patch. Once these are accepted, we had better stick a fork in it. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, > KAFKA-2168_2015-06-23_09:39:07.patch, KAFKA-2168_2015-06-30_10:54:22.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608743#comment-14608743 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, > KAFKA-2168_2015-06-23_09:39:07.patch, KAFKA-2168_2015-06-30_10:54:22.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14607548#comment-14607548 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- This version was committed to trunk. Were we expecting any follow up patches in this JIRA or should we close this? > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, > KAFKA-2168_2015-06-23_09:39:07.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14597914#comment-14597914 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, > KAFKA-2168_2015-06-23_09:39:07.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14596850#comment-14596850 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14593579#comment-14593579 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, > KAFKA-2168_2015-06-19_09:19:02.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592582#comment-14592582 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14582528#comment-14582528 ] Jay Kreps commented on KAFKA-2168: -- Hey [~guozhang], have you had a chance to look at this? It would be good to get your thoughts as it relates somewhat to the refactoring you did... > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14582524#comment-14582524 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, > KAFKA-2168_2015-06-11_14:09:59.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14575088#comment-14575088 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14575032#comment-14575032 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, > KAFKA-2168_2015-06-05_12:01:28.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573627#comment-14573627 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14572083#comment-14572083 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, > KAFKA-2168_2015-06-03_21:06:42.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571944#comment-14571944 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570036#comment-14570036 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, > KAFKA-2168_2015-06-02_17:09:37.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568212#comment-14568212 ] Jason Gustafson commented on KAFKA-2168: The most recent patch attempts to follow [~guozhang]'s overall advice above. Most of the calls are still blocking, but I have moved the blocking code out of Coordinator/Fetcher and into KafkaConsumer. This makes it possible to use wakeup() from the consumer without splitting the logic across multiple classes. The consumer is no longer synchronized, which makes it unsafe for multi-threaded access, but wakeup() can be safely used from other threads. This should also resolve KAFKA-2230. Note also that this patch will likely have to be updated if KAFKA-2123 is accepted. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568198#comment-14568198 ] Jason Gustafson commented on KAFKA-2168: Updated reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563968#comment-14563968 ] Jason Gustafson commented on KAFKA-2168: This patch is a work in progress. Using the wakeup() method in order to invoke close() on NetworkClient does not work as easily as I thought it would due to the completeAll() methods which invoke poll() in a loop. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563843#comment-14563843 ] Jason Gustafson commented on KAFKA-2168: I've added a patch which removes synchronization and allows a prompt close (using the wakeup call on the underlying selector). It does not expose the wakeup call in the consumer interface, however, since that seems to be a bit trickier. I think we may want to move that to a separate ticket. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563832#comment-14563832 ] Jason Gustafson commented on KAFKA-2168: Created reviewboard https://reviews.apache.org/r/34789/diff/ against branch upstream/trunk > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > Attachments: KAFKA-2168.patch > > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563753#comment-14563753 ] Guozhang Wang commented on KAFKA-2168: -- I would also prefer to stick with single-threaded consumer usage, and I agree that KAFKA-2123 would be important to have then. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563212#comment-14563212 ] Jason Gustafson commented on KAFKA-2168: Talked with Neha, Ewen, and Jay last night. Consensus was to remove the synchronization of KafkaConsumer and provide a wakeup() method which can be used to interrupt a long poll. This should solve the issue from this ticket, though it may hinge on KAFKA-1894, which removes polling loops from the current consumer. Note that this explicitly makes the consumer unsafe for multi-threaded access, though we will provide a thread-safe close() method which can be called (for example) from a shutdown hook. I'm going to update this ticket to reflect this change and submit a patch. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562104#comment-14562104 ] Jay Kreps commented on KAFKA-2168: -- Yeah for processing messages we really thought about two models: 1. N fetcher threads with N clients feeding M processors through a blocking queue (M could be 1) 2. N threads each with a client which does it's own processing I think what you are suggesting is 3. M processor threads each fetching and processing using a shared client I wonder how well (3) will work given that poll() returns all available messages. So it may not distribute load very well. [~ewencp] your use cases are good. Here is my take on those: 1. See above 2. Doesn't this work now? The concern with allowing simultaneous commit() and poll is just that it will expose weird intermediate states where your offset is being updated. 3. I don't think the consumer's close is blocking in the way the producer's is. The producer has to block until all sent messages are gone but the consumer just shuts down your tcp connections and exits so I'm not sure if this is needed? 4. I think metrics() can actually be called in parallel from any number of threads and isn't blocked on poll 5. This is a good point. It may be okay just to wait, though, right and have MM just set a low timeout? At a high-level my concern is just that this may be a fairly deep change and break a lot of assumptions. That code is very single-threaded top to bottom so I'm worried about trying to change that and kind of hoping we can wave our hands and not need to :-) > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562107#comment-14562107 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- Agreed, it's a matter of tradeoffs. [~nehanarkhede], I think the other tradeoff you didn't mention is which one is easier for the user of the API. The non-threadsafe version definitely leaves more work to the user (some of which, like correctly choosing timeouts, we know is error prone since we've had related bugs in the producer). close() should be straightforward if we go for a non-threadsafe approach -- just need to set a flag, wake up the selector, and make sure all the consumer classes obey that flag by exiting immediately. I think if we go that route, KAFKA-2123 (at a minimum adding callbacks to async commits) becomes critical since the expectation is that you handle commits in the same thread as poll(). Any unavailability of the coordinator could block processing indefinitely while waiting for a commit if you need to know that it actually completes. I think it would also require a wakeup() mechanism that doesn't exist in the public API yet. You can't always predict the necessary timeout, e.g. if you want to be able to respond to external events such as a network message. close() and wakeup() should be the only two methods that actually require thread safety guarantees. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562061#comment-14562061 ] Neha Narkhede commented on KAFKA-2168: -- There are tradeoffs to having multiple threads per consumer instance vs having a consumer instance per thread. The consumer code is simpler in the latter design, the throughput is better but the # of TCP connections are fewer in the former design. Some of the concerns [~ewencp] brings up above can be mitigated if there is a separate consumer instance per user thread and others can be mitigated by the user picking the right timeout on poll() that they are comfortable blocking on. All of this would mean explicitly stating that the consumer APIs are not threadsafe and that the user should create multiple consumer instances across threads instead of sharing one. We still need to make sure close() can be called from a separate thread as [~ewencp] correctly points out, though the change isn't large if we go down this route. It seems like it is simpler to stick to the original intention of the design and not share consumer instances across threads? > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559650#comment-14559650 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- Some reasons you might want to use the consumer from multiple threads: 1. I don't think it's necessarily addressed by this JIRA, but if processing messages is expensive, or the processing code is easier to write as synchronous calls even if it requires accessing some network resource, you might want multiple threads to be able to call poll(). This should already behave correctly. 2. Manage offset commits in a separate thread from polling. If you need to coordinate some other action with offset commit, your choices are currently to be careful in computing timeouts for poll() in order to get processing in the same thread or to try committing from another thread. The code for doing this is much simpler to write if you can just fire up a thread that does sync commit + whatever other operation you need to do, then sleeps for the next commit interval. If you do this right you can continue processing messages during the offset commit, even if it ends up delayed for some reason. 3. close() is probably the most obvious case given the feedback we've had on the producer's close() method blocking indefinitely -- you want to be able to close() from a separate thread if you keep a thread dedicated to poll()ing. For example, using a shutdown hook requires this. The feedback on the producer made it clear this is important and should also have a timeout. 4. Metrics. MetricsReporter is the "right" way to get metrics, but that only works if what you care about is already covered. I don't think per topic-partition position() and committed() are currently reported -- not sure what the plan is there since reporting metrics in something like mirrormaker might be too much, but some applications will want to be able to track that info in metrics. This is another case where just firing up a thread to periodically check the state of the consumer and report it via whatever metrics package they use is probably the easiest implementation. 5. Any time you may need to make dynamic changes to the consumer in response to external events. For example, consider a mirrormaker-like service. If you want to be able to dynamically reconfigure the consumer to add new topics to the job, subscribe() will block indefinitely if poll() has a long timeout and new data isn't flowing in to the topics you're already subscribed to. A wakeup() method isn't good enough here since you need to manage the subsequent race between the thread trying to subscribe() and the poll()ing thread. At a minimum, the current state where thread safety is guaranteed in the javadoc but we have indefinite blocking is a problem. If we want it to be a single-threaded API, then we should just leave locking up to the user (although we'd probably still at least want some sort of wakeup() method so they could interrupt long poll() calls). > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g.,
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559563#comment-14559563 ] Jay Kreps commented on KAFKA-2168: -- This is a pretty big redesign of the consumer. The single-threadedness was intentional in the original design, can we start by sketching out why you want multiple threads using the same consumer instance? > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14555396#comment-14555396 ] Guozhang Wang commented on KAFKA-2168: -- Currently the NetworkClient's blocking calls: "poll" and "completeAll" (which triggers "poll" as well) are used in different layers as for: 1. KafkaConsumer.poll() naturally triggers client.poll(). 2. Coordinator.ensureCoordinatorReady() to find a node to ask for consumer coordinator; 3. Coordinator.sendAndReceive() for blocking requests as sync commit offsets, fetch offsets, join group, and fetch coordinator. 4. Fetcher.awaitMetadataUpdate() for blocking on metadata update. 5. Fetcher.offsetBefore() to blocking until successfully get list-offset responses. Besides 1), all other four uses of NetworkClient's blocking calls are actually breaking the API declaration that consumer.poll(timeout) will block for as long as timeout period (for 3) actually sync commit offsets is OK since it is by definition blocking calls). In addition, 4) is a duplicate function as there is another KafkaConsumer.awaitMetadataUpdate. So maybe we could consider combing the fix of KAFKA-1894 together and see if the following is possible: 1. Move all "poll" and "completeAll" from Coordinator and Fetcher into KafkaConsumer to make sure none of Coordiantor / Fetcher functions are blocking. For example, if the coordinator is not known, instead of blocking on ensureCoordinatorReady we should just call client.send(consumer-metadata-request) and return, and depend on the callback to handle coordinator discovery. This is mainly for KAFKA-1894 and will of course complicated the logic of Coordinator and Fetcher since now they need to maintain some more internal state. 2. Move Fetcher.resetOffset/OffsetBefore into subscription, and remove Fetcher.awaitMetadataUpdate. After this metadata becomes read-only to fetcher. 3. After the first two steps the finer synchronization approach would be simpler since now we only need to synchronize on subscriptions. A side-effect though would be that calls to consumer.poll(timemout) may now returns much quicker with no returned data under partition re-assignment / sync offset commit, etc, and people need to call it multiple times before getting data. But as long as we document the usage clear I think this is fine. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14554867#comment-14554867 ] Jason Gustafson commented on KAFKA-2168: Yes, that is the tradeoff, but at least it would be confined to the KafkaConsumer class. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14554799#comment-14554799 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- But isn't that going to make a mess of all the methods in Kafka consumer since we need to do this everywhere we currently synchronize? And I don't see a good way of providing it as a generic utility since you need the body of the method within the finally block. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14554743#comment-14554743 ] Jason Gustafson commented on KAFKA-2168: Note that the coordinator has a couple cases where poll is called in a loop. There's a separate issue to fix this: KAFKA-1894. Might want to hold off on this until that is resolved. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14554724#comment-14554724 ] Jason Gustafson commented on KAFKA-2168: I feel a little wary about finer-grained synchronization given all the state in the consumer, the network client, and the selector. I actually think the two-lock approach is the least intrusive since it only touches the KafkaConsumer and preserves the current coarse synchronization design, but I agree that it's unusual. Here's an idea of what it might look like in the code: {code:java} lock.queue(); try { client.wakeup(); lock.lock(); // critical section } finally { lock.unlock() } {code} Definitely weird, but not that hard to understand. You'd still run into the same problem if multiple threads are trying to poll, but that seems like unintended usage anyway. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553343#comment-14553343 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- Actually, now I realize another solution is to only remove synchronization from the one place it's a problem -- things that might call NetworkClient.poll() with long timeouts. Could we use synchronized(this) around everything *except* the NetworkClient.poll() calls, and then have anything using NetworkClient synchronize on it? This is finer grained locking still, but I think could end up having pretty minimal impact on the current code. The drawback is that since NetworkClient is used by all the classes, the requirement of locking gets spread across all of them. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553332#comment-14553332 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- For option 1 it's probably worth pointing out that we already have some finer grained synchronization (metrics and metadata since those are shared by many other components, and the producer doesn't have synchronization at the level of KafkaProducer, only on its internals). So we're already double locking in a lot of cases. My concern with option 2 is that it's a pretty unusual approach which makes the code harder to understand. Scanning through the code, there aren't that many places in KafkaConsumer where multiple components are used together in a way that would require synchronization. updateFetchPositions and refreshCommitttedOffsets might since they use subscriptions + fetcher and subscriptions + coordinator together, respectively. Especially with SubscriptionState we'd need to be careful since some of the calls to that return an internal collection & flags, and the subsequent operation might need all that processing to be synchronized to be sure not to miss anything. For example, during partition reassignment, which checks a flag, does reassignment, and then resets the flag; we'd need to make sure that a subscription during that time wouldn't get lost. The other case is poll(). I thought this might be hard to reason about if some state was changing while it was executing, but I think it's not a problem as long as a few of the steps can be synchronized, in particular partition reassignment and offset commit. By the way, I mapped out the dependencies. It's sort of in 4 layers with subscriptions + metadata at the bottom, NetworkClient above that using only metadata, then all three are used by both coordinator and fetcher in the next layer, and then the top layer is KafkaConsumer. But KafkaConsumer touches all of them, so kind of breaks any layering. Some of the things in KafkaConsumer that require synchronization still could possibly move into a component in a lower level (possibly something new) if we move some of the code around. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553167#comment-14553167 ] Jason Gustafson commented on KAFKA-2168: I've started taking a look at this issue. Here are a couple options: 1. Finer-grained synchronization: In KafkaConsumer, it looks like synchronization is needed primarily around the SubscriptionState instance and the NetworkClient. It may be possible to push the synchronization into these classes and avoid the direct synchronization in KafkaConsumer. It may work, but will make reasoning about the correctness of the consumer more difficult. 2. Coarse-grained synchronization with wakeup: As Ewen suggests, the selector can be woken up before the operation that needs to be done. A second lock (or a queue) could be introduced to solve the starvation problem mentioned above. Basically each thread would have to acquire the second lock (or traverse the queue) before being able to acquire the critical lock. This logic could be encapsulated in a separate class to avoid polluting the consumer too much. Either approach will add some new complexity to the consumer. There is also the option of doing nothing. In that case, interactions with the consumer must come from the polling thread (in-between polls). This puts the burden on the user to implement the hooks in the polling thread to commit offsets, do seeks, or whatever. > New consumer poll() can block other calls like position(), commit(), and > close() indefinitely > - > > Key: KAFKA-2168 > URL: https://issues.apache.org/jira/browse/KAFKA-2168 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > > The new consumer is currently using very coarse-grained synchronization. For > most methods this isn't a problem since they finish quickly once the lock is > acquired, but poll() might run for a long time (and commonly will since > polling with long timeouts is a normal use case). This means any operations > invoked from another thread may block until the poll() call completes. > Some example use cases where this can be a problem: > * A shutdown hook is registered to trigger shutdown and invokes close(). It > gets invoked from another thread and blocks indefinitely. > * User wants to manage offset commit themselves in a background thread. If > the commit policy is not purely time based, it's not currently possibly to > make sure the call to commit() will be processed promptly. > Two possible solutions to this: > 1. Make sure a lock is not held during the actual select call. Since we have > multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) > this is probably hard to make work cleanly since locking is currently only > performed at the KafkaConsumer level and we'd want it unlocked around a > single line of code in Selector. > 2. Wake up the selector before synchronizing for certain operations. This > would require some additional coordination to make sure the caller of > wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() > thread being woken up and then promptly reacquiring the lock with a > subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)