[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=339445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339445 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 06/Nov/19 16:09 Start Date: 06/Nov/19 16:09 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on pull request #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 339445) Time Spent: 2h 40m (was: 2.5h) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=338644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338644 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 05/Nov/19 10:16 Start Date: 05/Nov/19 10:16 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-549391342 @lukecwik If we are agreed on current solution (limit internal message queue by number of records) then I'm fine to merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338644) Time Spent: 2.5h (was: 2h 20m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=338141=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338141 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 04/Nov/19 14:55 Start Date: 04/Nov/19 14:55 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-549391342 @lukecwik If we are agree on current solution (limit internal message queue by number of records) then I'm fine to merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338141) Time Spent: 2h 20m (was: 2h 10m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=77=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-77 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 24/Oct/19 12:57 Start Date: 24/Oct/19 12:57 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-545905039 @kennknowles This one is definitively not a release blocker since it's not a regression and this bug can occur only under some load conditions. We haven't come to final solution for the moment (I raised several questions in above discussion) and final fix may affect user API. So I'd not merge this (even as experimental) until we decide how to proceed further. For now, I just removed `Fix Version/s` in associated Jira. Sorry for noice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 77) Time Spent: 2h (was: 1h 50m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=78=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-78 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 24/Oct/19 12:57 Start Date: 24/Oct/19 12:57 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-545905039 @kennknowles This one is definitively **not** a release blocker since it's not a regression and this bug can occur only under some load conditions. We haven't come to final solution for the moment (I raised several questions in above discussion) and final fix may affect user API. So I'd not merge this (even as experimental) until we decide how to proceed further. For now, I just removed `Fix Version/s` in associated Jira. Sorry for noice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 78) Time Spent: 2h 10m (was: 2h) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=332767=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332767 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 23/Oct/19 18:27 Start Date: 23/Oct/19 18:27 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-545575978 This is associated with a bug marked release-blocking. Is that accurate? Is it important enough that we should go with an early approach and make improvements later? Maybe mark experimental/deprecate immediately if you know it will not last... or if it is not urgent to make 2.17.0 perhaps adjust the Jira. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 332767) Time Spent: 1h 50m (was: 1h 40m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326989=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326989 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 11/Oct/19 16:53 Start Date: 11/Oct/19 16:53 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-541141921 I agree that limitation based on payload size would be better, though it's still unclear how we can manage it dynamically. 1. Should be initial value based on process heap size as default value or configured by user? 2. How much of free space we should leave unused to prevent NPE because of new arrived records (we don't know their size in advance)? 3. Do we need to adapt the maximum queue size in runtime? 4. If yes, what are the causes for that? Btw, do you already have a solution for Java portability container problem? Either maybe we have something similar implemented for other IO? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326989) Time Spent: 1h 40m (was: 1.5h) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326443 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 16:45 Start Date: 10/Oct/19 16:45 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540673245 Yeah, the process needs to have a memory budget and every "largeish" thing should be accounted for. I have been thinking about how to best do this for the Java portability container since it would allow for removing several knobs that already exist. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326443) Time Spent: 1.5h (was: 1h 20m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326425=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326425 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 16:28 Start Date: 10/Oct/19 16:28 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540666549 @lukecwik Regarding your questions: 1. Default value was (and stays) 10K records per shard and it worked well for topics with not large amount of shards per worker (we've got this issue only after 2 years of KinesisIO utilisation by different users). So, in general, we can conclude that user doesn't need to configure it manually and this option has to be used only in corner cases. 2. Yes, this is a field for improvement. Though, it's not very clear under which conditions user would need to be able change this option in the runtime dynamically. > you can assume each is the maximum size of 1MiB I mean the maximum size of available memory that can be allocated for this queue. Actually, it goes to more complicated question how effectively implement backpressure to have effectively working pair of consumer/producer in terms of consumed memory and total performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326425) Time Spent: 1h 10m (was: 1h) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326426 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 16:28 Start Date: 10/Oct/19 16:28 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540666549 @lukecwik Regarding your questions: 1. Default value was (and stays) 10K records per shard and it worked well for topics with not large amount of shards per worker (we've got this issue only after 2 years of KinesisIO utilisation by different users). So, in general, we can conclude that user doesn't need to configure it manually and this option has to be used only in corner cases. 2. Yes, this is a field for improvement. Though, it's not very clear under which conditions user would need to be able change this option in the runtime dynamically. > you can assume each is the maximum size of 1MiB I mean the maximum size of available memory that can be allocated for this queue. Actually, it goes to more complicated question how effectively implement backpressure to have effectively working pair of consumer/producer in terms of consumed memory and total performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326426) Time Spent: 1h 20m (was: 1h 10m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326405 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 15:48 Start Date: 10/Oct/19 15:48 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540650009 You can use the size of the KinesisRecords data field or you can assume each is the maximum size of 1MiB (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326405) Time Spent: 1h (was: 50m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326402 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 15:44 Start Date: 10/Oct/19 15:44 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540648157 @lukecwik Yes, I agree that we need to reduce the number of tuning knobs to as less as possible and try to configure transforms automatically and dynamically if it's feasible. But here is a question how we can estimate a maximum size in bytes in this case... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326402) Time Spent: 50m (was: 40m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 50m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326389 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 15:15 Start Date: 10/Oct/19 15:15 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540635413 Your right that you can provided an immediate solution right now and this is an acceptable change that will solve a real problem. What I wanted to highlight about "tuning" flags in general is that: 1) Users need to know to set it to some value but what value can we suggest to them? 2) It doesn't work dynamically over time so that users need to choose a value that is very conservative. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326389) Time Spent: 40m (was: 0.5h) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 40m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=326370=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-326370 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 10/Oct/19 14:43 Start Date: 10/Oct/19 14:43 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540621214 @lukecwik I think we **can** do this by adding a consumed messages size counter and checking this before putting new messages into queue. If total size is exceeded then we can stop and block fetching and adding new messages until the queue will be dequeued and there will be a free space for new records. In the same time, it will be a more complicated implementation comparing to what we have now and I don't see too much profit from it. Do I miss something? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 326370) Time Spent: 0.5h (was: 20m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=325834=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325834 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 09/Oct/19 17:30 Start Date: 09/Oct/19 17:30 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745#issuecomment-540105696 Can't we instead use some other limit such as sum of queued message sizes to limit memory pressure? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325834) Time Spent: 20m (was: 10m) > Reading records in background may lead to OOM errors > > > Key: BEAM-8352 > URL: https://issues.apache.org/jira/browse/BEAM-8352 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis >Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0, 2.14.0, 2.15.0 >Reporter: Mateusz Juraszek >Assignee: Alexey Romanenko >Priority: Major > Fix For: 2.17.0 > > Time Spent: 20m > Remaining Estimate: 0h > > We have faced a problem with OOM errors in our dataflow job containing > Kinesis sources. After investigation, it occurred that the issue was caused > by too many records being consumed by Kinesis sources that pipeline couldn't > handle in time. > Looking into the Kinesis connector's code, the internal queue (recordsQueue) > that records are being put in the background is setup for each > ShardReadersPool (created for each source being Kinesis stream). The size of > the queue is set to `queueCapacityPerShard * number of shards`. The bigger > number of shards, the bigger queue size. There is no ability to limit the > maximum capacity of the queue (queueCapacityPerShard is also not configurable > and it's set to DEFAULT_CAPACITY_PER_SHARD=10_000). Additionally, there is no > differentiation on records size, so the size of data placed to the queue > might increase to the point where OOM will be thrown. > It would be great to have ability to somehow limit the number of records that > are being read in the background to some sensible value. At the beginning, > simple solution would be to allow configuring max queue size for source at > the creation of KinesisIO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8352) Reading records in background may lead to OOM errors
[ https://issues.apache.org/jira/browse/BEAM-8352?focusedWorklogId=325159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325159 ] ASF GitHub Bot logged work on BEAM-8352: Author: ASF GitHub Bot Created on: 08/Oct/19 16:13 Start Date: 08/Oct/19 16:13 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on pull request #9745: [BEAM-8352] Add "withMaxCapacityPerShard()" to KinesisIO.Read URL: https://github.com/apache/beam/pull/9745 Added new configuration option `withMaxCapacityPerShard()` to `KinesisIO.Read` to provide a possibility to control the size of `ShardReadersPool.recordsQueue` in case if default value (10K records per shard) can cause NPE for topics with big amount of shards or large messages that can't fit into memory. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build