Github user HeartSaVioR commented on the issue:
https://github.com/apache/spark/pull/21700
I would like to add numbers to pursuade how much this patch is helpful for
end users of Apache Spark.
I crafted and published a project which implements some stateful use cases
with IoT Trucking example.
https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming
With running apps with I can see that cache (loadedMaps) in
HDFSBackedStateStoreProvider consumes much more memory than one version of
state. It's not like 10~30% but more than 1500% and even more than 8000% in
specific case based on the update ratio of state.
(Capturing overall map size of provider requires applying the patch #21469 )
Below table is the result of the query, publishing query status to Kafka
topic and query these data via Spark SQL.
https://gist.github.com/HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3
> Before applying the patch (`spark.sql.streaming.minBatchesToRetain` set
to default value 100)
* stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
319 | 765456 | 2632 | 185499903 | 3307747279 | 17.8315310439811928
* window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
142 | 184 | 138 | 72103 | 6214927 | 86.1951236425668835
* deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
634 | 598 | 0 | 136279 | 6587839 | 48.3408228707284321
> After applying this patch
(`spark.sql.streaming.maxBatchesToRetainInMemory` set to default value 2)
* stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
127 | 298452 | 4170 | 71023679 | 84454399 | 1.1891020035726395
* window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
132 | 184 | 138 | 72319 | 162647 | 2.2490216955433565
* deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala)
batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes |
providerLoadedMapSize | stateExcessLoadingOverheadPercentage
-- | -- | -- | -- | -- | --
133 | 598 | 0 | 136079 | 227863 | 1.6744905532815497
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]