[jira] [Comment Edited] (SPARK-20168) Enable kinesis to start stream from Initial position specified by a timestamp
[ https://issues.apache.org/jira/browse/SPARK-20168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602909#comment-16602909 ] Vladimir Pchelko edited comment on SPARK-20168 at 9/4/18 11:04 AM: --- [~srowen] this bug must be covered by unit tests. was (Author: vpchelko): [~srowen] this bug must be covered by unit tests > Enable kinesis to start stream from Initial position specified by a timestamp > - > > Key: SPARK-20168 > URL: https://issues.apache.org/jira/browse/SPARK-20168 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.1.0 >Reporter: Yash Sharma >Assignee: Yash Sharma >Priority: Minor > Labels: kinesis, streaming > Fix For: 2.4.0 > > > Kinesis client can resume from a specified timestamp while creating a stream. > We should have option to pass a timestamp in config to allow kinesis to > resume from the given timestamp. > Have started initial work and will be posting a PR after I test the patch - > https://github.com/yssharma/spark/commit/11269abf8b2a533a1b10ceee80ac2c3a2a80c4e8 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20168) Enable kinesis to start stream from Initial position specified by a timestamp
[ https://issues.apache.org/jira/browse/SPARK-20168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602909#comment-16602909 ] Vladimir Pchelko commented on SPARK-20168: -- [~srowen] this bug must be covered by unit tests > Enable kinesis to start stream from Initial position specified by a timestamp > - > > Key: SPARK-20168 > URL: https://issues.apache.org/jira/browse/SPARK-20168 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.1.0 >Reporter: Yash Sharma >Assignee: Yash Sharma >Priority: Minor > Labels: kinesis, streaming > Fix For: 2.4.0 > > > Kinesis client can resume from a specified timestamp while creating a stream. > We should have option to pass a timestamp in config to allow kinesis to > resume from the given timestamp. > Have started initial work and will be posting a PR after I test the patch - > https://github.com/yssharma/spark/commit/11269abf8b2a533a1b10ceee80ac2c3a2a80c4e8 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18805) InternalMapWithStateDStream make java.lang.StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-18805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771504#comment-15771504 ] Vladimir Pchelko commented on SPARK-18805: -- I had faced with similar problem ... there are two 'problems' with mapWithState: 1. spark.streaming.concurrentJobs 2. lack of memory with high GC time In both cases I noticed strange/magic errors. It seems in your case - application is unrecoverable due lack of memory. > InternalMapWithStateDStream make java.lang.StackOverflowError > -- > > Key: SPARK-18805 > URL: https://issues.apache.org/jira/browse/SPARK-18805 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.6.3, 2.0.2 > Environment: mesos >Reporter: etienne > > When load InternalMapWithStateDStream from a check point. > If isValidTime is true and if there is no generatedRDD at the given time > there is an infinite loop. > 1) compute is call on InternalMapWithStateDStream > 2) InternalMapWithStateDStream try to generate the previousRDD > 3) Stream look in generatedRDD if the RDD is already generated for the given > time > 4) It not fund the rdd so it check if the time is valid. > 5) if the time is valid call compute on InternalMapWithStateDStream > 6) restart from 1) > Here the exception that illustrate this error > {code} > Exception in thread "streaming-start" java.lang.StackOverflowError > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Description: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer than batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current MapWithStateRDD and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. was: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer than batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quickly get OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but leads to new one - we unable process data in real-time - > because the checkpointing duration is in several times longer than > batchInterval. > So I investigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current MapWithStateRDD and the last > checkpointed MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Description: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer than batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. was: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer that batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quickly get OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but leads to new one - we unable process data in real-time - > because the checkpointing duration is in several times longer than > batchInterval. > So I investigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
[ https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561 ] Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:21 AM: Currently the user can modify interval of mapWithState checkpoint, for example: {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} was (Author: vpchelko): Currently user can modify interval of mapWithState checkpoint, for example: {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} > mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER > -- > > Key: SPARK-18564 > URL: https://issues.apache.org/jira/browse/SPARK-18564 > Project: Spark > Issue Type: Improvement >Reporter: Daniel Haviv > > Currently mapWithState checkpoints the whole state every 10 batches. > Large state checkpointing can cause huge delays. exposing > DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the > user mitigate these delays. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
[ https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561 ] Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:21 AM: Currently the user can modify interval of mapWithState checkpoints, for example: {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} was (Author: vpchelko): Currently the user can modify interval of mapWithState checkpoint, for example: {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} > mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER > -- > > Key: SPARK-18564 > URL: https://issues.apache.org/jira/browse/SPARK-18564 > Project: Spark > Issue Type: Improvement >Reporter: Daniel Haviv > > Currently mapWithState checkpoints the whole state every 10 batches. > Large state checkpointing can cause huge delays. exposing > DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the > user mitigate these delays. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
[ https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561 ] Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:20 AM: Currently user can modify interval of mapWithState checkpoint, for example: {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} was (Author: vpchelko): Currently user can modify interval of mapWithState checkpoint, for example {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} > mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER > -- > > Key: SPARK-18564 > URL: https://issues.apache.org/jira/browse/SPARK-18564 > Project: Spark > Issue Type: Improvement >Reporter: Daniel Haviv > > Currently mapWithState checkpoints the whole state every 10 batches. > Large state checkpointing can cause huge delays. exposing > DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the > user mitigate these delays. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
[ https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561 ] Vladimir Pchelko commented on SPARK-18564: -- Currently user can modify interval of mapWithState checkpoint, for example {code} val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ... val stateDStream = sourceDStream.mapWithState(...) stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER) {code} > mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER > -- > > Key: SPARK-18564 > URL: https://issues.apache.org/jira/browse/SPARK-18564 > Project: Spark > Issue Type: Improvement >Reporter: Daniel Haviv > > Currently mapWithState checkpoints the whole state every 10 batches. > Large state checkpointing can cause huge delays. exposing > DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the > user mitigate these delays. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Description: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer that batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. was: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we qucikly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but lead to new one - we unable to process in real-time - because the checkpointing duration is in several times longer that batchInterval. So I inverstigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quickly get OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but leads to new one - we unable process data in real-time - > because the checkpointing duration is in several times longer that > batchInterval. > So I investigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Description: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we qucikly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but lead to new one - we unable to process in real-time - because the checkpointing duration is in several times longer that batchInterval. So I inverstigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. was: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quikly runs OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but lead to new one - we unable to process in real-time - because the checkpointing duration is in several times longer that batchInterval. So I inverstigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we qucikly get OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but lead to new one - we unable to process in real-time - > because the checkpointing duration is in several times longer that > batchInterval. > So I inverstigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Attachment: (was: 0001-override-clearMetadata-for-InternalMapWithStateDStre.patch) > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quikly runs OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but lead to new one - we unable to process in real-time - > because the checkpointing duration is in several times longer that > batchInterval. > So I inverstigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Attachment: 0001-override-clearMetadata-for-InternalMapWithStateDStre.patch > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > Attachments: > 0001-override-clearMetadata-for-InternalMapWithStateDStre.patch > > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quikly runs OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but lead to new one - we unable to process in real-time - > because the checkpointing duration is in several times longer that > batchInterval. > So I inverstigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState
[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Pchelko updated SPARK-18925: - Affects Version/s: 2.0.0 2.0.1 2.0.2 Priority: Major (was: Minor) Component/s: DStreams > Reduce memory usage of mapWithState > --- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quikly runs OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but lead to new one - we unable to process in real-time - > because the checkpointing duration is in several times longer that > batchInterval. > So I inverstigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current and the last checkpointed > MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18925) Reduce memory usage of mapWithState
Vladimir Pchelko created SPARK-18925: Summary: Reduce memory usage of mapWithState Key: SPARK-18925 URL: https://issues.apache.org/jira/browse/SPARK-18925 Project: Spark Issue Type: Improvement Reporter: Vladimir Pchelko Priority: Minor With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quikly runs OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but lead to new one - we unable to process in real-time - because the checkpointing duration is in several times longer that batchInterval. So I inverstigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org