[jira] [Comment Edited] (SPARK-20168) Enable kinesis to start stream from Initial position specified by a timestamp

2018-09-04 Thread Vladimir Pchelko (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602909#comment-16602909
 ] 

Vladimir Pchelko edited comment on SPARK-20168 at 9/4/18 11:04 AM:
---

[~srowen]


 this bug must be covered by unit tests.

 


was (Author: vpchelko):
[~srowen]
 this bug must be covered by unit tests

> Enable kinesis to start stream from Initial position specified by a timestamp
> -
>
> Key: SPARK-20168
> URL: https://issues.apache.org/jira/browse/SPARK-20168
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Yash Sharma
>Assignee: Yash Sharma
>Priority: Minor
>  Labels: kinesis, streaming
> Fix For: 2.4.0
>
>
> Kinesis client can resume from a specified timestamp while creating a stream. 
> We should have option to pass a timestamp in config to allow kinesis to 
> resume from the given timestamp.
> Have started initial work and will be posting a PR after I test the patch -
> https://github.com/yssharma/spark/commit/11269abf8b2a533a1b10ceee80ac2c3a2a80c4e8



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20168) Enable kinesis to start stream from Initial position specified by a timestamp

2018-09-04 Thread Vladimir Pchelko (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602909#comment-16602909
 ] 

Vladimir Pchelko commented on SPARK-20168:
--

[~srowen]
 this bug must be covered by unit tests

> Enable kinesis to start stream from Initial position specified by a timestamp
> -
>
> Key: SPARK-20168
> URL: https://issues.apache.org/jira/browse/SPARK-20168
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Yash Sharma
>Assignee: Yash Sharma
>Priority: Minor
>  Labels: kinesis, streaming
> Fix For: 2.4.0
>
>
> Kinesis client can resume from a specified timestamp while creating a stream. 
> We should have option to pass a timestamp in config to allow kinesis to 
> resume from the given timestamp.
> Have started initial work and will be posting a PR after I test the patch -
> https://github.com/yssharma/spark/commit/11269abf8b2a533a1b10ceee80ac2c3a2a80c4e8



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18805) InternalMapWithStateDStream make java.lang.StackOverflowError

2016-12-22 Thread Vladimir Pchelko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771504#comment-15771504
 ] 

Vladimir Pchelko commented on SPARK-18805:
--

I had faced with similar problem ... there are two 'problems' with mapWithState:

1. spark.streaming.concurrentJobs
2. lack of memory with high GC time

In both cases I noticed strange/magic errors.

It seems in your case - application is unrecoverable due lack of memory.

> InternalMapWithStateDStream make java.lang.StackOverflowError 
> --
>
> Key: SPARK-18805
> URL: https://issues.apache.org/jira/browse/SPARK-18805
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.3, 2.0.2
> Environment: mesos
>Reporter: etienne
>
> When load InternalMapWithStateDStream from a check point.
> If isValidTime is true and if there is no generatedRDD at the given time 
> there is an infinite loop.
> 1) compute is call on InternalMapWithStateDStream
> 2) InternalMapWithStateDStream try to generate the previousRDD
> 3) Stream look in generatedRDD if the RDD is already generated for the given 
> time 
> 4) It not fund the rdd so it check if the time is valid.
> 5) if the time is valid call compute on InternalMapWithStateDStream
> 6) restart from 1)
> Here the exception that illustrate this error
> {code}
> Exception in thread "streaming-start" java.lang.StackOverflowError
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-21 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Description: 
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer than 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current MapWithStateRDD and the last 
checkpointed MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.

  was:
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer than 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.


> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quickly get OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but leads to new one - we unable process data in real-time - 
> because the checkpointing duration is in several times longer than 
> batchInterval.
> So I investigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current MapWithStateRDD and the last 
> checkpointed MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-21 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Description: 
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer than 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.

  was:
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer that 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.


> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quickly get OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but leads to new one - we unable process data in real-time - 
> because the checkpointing duration is in several times longer than 
> batchInterval.
> So I investigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER

2016-12-21 Thread Vladimir Pchelko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561
 ] 

Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:21 AM:


Currently the user can modify interval of mapWithState checkpoint, for example:
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}


was (Author: vpchelko):
Currently user can modify interval of mapWithState checkpoint, for example:
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}

> mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
> --
>
> Key: SPARK-18564
> URL: https://issues.apache.org/jira/browse/SPARK-18564
> Project: Spark
>  Issue Type: Improvement
>Reporter: Daniel Haviv
>
> Currently mapWithState checkpoints the whole state every 10 batches.
> Large state checkpointing can cause huge delays. exposing 
> DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the 
> user mitigate these delays.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER

2016-12-21 Thread Vladimir Pchelko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561
 ] 

Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:21 AM:


Currently the user can modify interval of mapWithState checkpoints, for example:
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}


was (Author: vpchelko):
Currently the user can modify interval of mapWithState checkpoint, for example:
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}

> mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
> --
>
> Key: SPARK-18564
> URL: https://issues.apache.org/jira/browse/SPARK-18564
> Project: Spark
>  Issue Type: Improvement
>Reporter: Daniel Haviv
>
> Currently mapWithState checkpoints the whole state every 10 batches.
> Large state checkpointing can cause huge delays. exposing 
> DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the 
> user mitigate these delays.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER

2016-12-21 Thread Vladimir Pchelko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561
 ] 

Vladimir Pchelko edited comment on SPARK-18564 at 12/21/16 9:20 AM:


Currently user can modify interval of mapWithState checkpoint, for example:
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}


was (Author: vpchelko):
Currently user can modify interval of mapWithState checkpoint, for example
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}

> mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
> --
>
> Key: SPARK-18564
> URL: https://issues.apache.org/jira/browse/SPARK-18564
> Project: Spark
>  Issue Type: Improvement
>Reporter: Daniel Haviv
>
> Currently mapWithState checkpoints the whole state every 10 batches.
> Large state checkpointing can cause huge delays. exposing 
> DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the 
> user mitigate these delays.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18564) mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER

2016-12-21 Thread Vladimir Pchelko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766561#comment-15766561
 ] 

Vladimir Pchelko commented on SPARK-18564:
--

Currently user can modify interval of mapWithState checkpoint, for example
{code}
val CUSTOM_CHECKPOINT_DURATION_MULTIPLIER = ...
val stateDStream = sourceDStream.mapWithState(...)
stateDStream.checkpoint(batchInterval * CUSTOM_CHECKPOINT_DURATION_MULTIPLIER)
{code}

> mapWithState: add configuration for DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
> --
>
> Key: SPARK-18564
> URL: https://issues.apache.org/jira/browse/SPARK-18564
> Project: Spark
>  Issue Type: Improvement
>Reporter: Daniel Haviv
>
> Currently mapWithState checkpoints the whole state every 10 batches.
> Large state checkpointing can cause huge delays. exposing 
> DEFAULT_CHECKPOINT_DURATION_MULTIPLIER as a configuration parameter can the 
> user mitigate these delays.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Description: 
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer that 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.

  was:
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we qucikly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but lead to new one - we unable to process in real-time - because 
the checkpointing duration is in several times longer that batchInterval.

So I inverstigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.


> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quickly get OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but leads to new one - we unable process data in real-time - 
> because the checkpointing duration is in several times longer that 
> batchInterval.
> So I investigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Description: 
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we qucikly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but lead to new one - we unable to process in real-time - because 
the checkpointing duration is in several times longer that batchInterval.

So I inverstigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.

  was:
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quikly runs OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but lead to new one - we unable to process in real-time - because 
the checkpointing duration is in several times longer that batchInterval.

So I inverstigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.


> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we qucikly get OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but lead to new one - we unable to process in real-time - 
> because the checkpointing duration is in several times longer that 
> batchInterval.
> So I inverstigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Attachment: (was: 
0001-override-clearMetadata-for-InternalMapWithStateDStre.patch)

> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quikly runs OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but lead to new one - we unable to process in real-time - 
> because the checkpointing duration is in several times longer that 
> batchInterval.
> So I inverstigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Attachment: 0001-override-clearMetadata-for-InternalMapWithStateDStre.patch

> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
> Attachments: 
> 0001-override-clearMetadata-for-InternalMapWithStateDStre.patch
>
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quikly runs OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but lead to new one - we unable to process in real-time - 
> because the checkpointing duration is in several times longer that 
> batchInterval.
> So I inverstigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-
Affects Version/s: 2.0.0
   2.0.1
   2.0.2
 Priority: Major  (was: Minor)
  Component/s: DStreams

> Reduce memory usage of mapWithState
> ---
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quikly runs OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but lead to new one - we unable to process in real-time - 
> because the checkpointing duration is in several times longer that 
> batchInterval.
> So I inverstigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current and the last checkpointed 
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
>   oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18925) Reduce memory usage of mapWithState

2016-12-19 Thread Vladimir Pchelko (JIRA)
Vladimir Pchelko created SPARK-18925:


 Summary: Reduce memory usage of mapWithState
 Key: SPARK-18925
 URL: https://issues.apache.org/jira/browse/SPARK-18925
 Project: Spark
  Issue Type: Improvement
Reporter: Vladimir Pchelko
Priority: Minor


With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quikly runs OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but lead to new one - we unable to process in real-time - because 
the checkpointing duration is in several times longer that batchInterval.

So I inverstigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
  oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)


Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org