PingHao edited a comment on issue #24922: [SPARK-28120][SS] Rocksdb state storage implementation URL: https://github.com/apache/spark/pull/24922#issuecomment-539565660 I had repackaging this and plug it into a spark 2.4.3 system. (https://github.com/PingHao/spark-statestore-rocksdb). here is my observation 1. we using flatMapGroupsWithState, it cause it fail at begining, 2019-10-07 15:19:57.968Z ERROR [Executor task launch worker for task 65] org.apache.spark.util.Utils - Aborting task java.lang.IllegalArgumentException: requirement failed: Cannot getRange after already committed or aborted at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:149) at org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107) at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181) at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128) at scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) I fixed this by change RocksDbStateStoreProvider.scala override def getRange( start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = { require (**_state == UPDATING || state == LOADED_**, s"Cannot getRange after already committed or aborted: state is $state") iterator() } 2. Rocksdb checkpoint creating had a quite high time cost, sometimes > 20 secs, it's become the most delay of many spark tasks. then I checked my three nodes spark worker, it turned out the data partition is one of on devicemapper, another two on xfs, then I changed all of them to a ext4 partition, the result is much better, it's now could be < 10ms for most case, but still sometimes could be > 100ms. 3. All spark executors stucks when one of executor try to load snapshot file from spark checkpoint. Note that this is a 3 node system, 56 CPU cores, 167 partitions. and spark checkpoint reside on a shared NFS partition (i know it's better to be HDFS) my theroy is when the downloading executor try to download the snapshot, it happen almost same time as another executor try to write the snapshot in same time. so it get the incompleted file and throw out result. I'm thinking let's it retry when this happen, all make task fail and let spark schedule rerun the task somehow. on host C try to download 2019-10-08 14:30:38.756Z INFO [Executor task launch worker for task 53012] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Will download file:/checkpoint/state/0/149/257.snapshot at location /logs/tmp/state_96281597/0/149/257.tar 2019-10-08 14:30:39.375Z INFO [Executor task launch worker for task 53012] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state for 257 and partition 149 took 626 ms. 2019-10-08 14:30:39.386Z ERROR [Executor task launch worker for task 53012] org.apache.spark.util.Utils - Aborting task java.lang.IllegalStateException: Error while creating rocksDb instance While opening a file for sequentially reading: /logs/checkpoint/state_96281597/0/149/257/CURRENT: No such file or directory at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:65) at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.iterator(RocksDbStateStoreProvider.scala:230) at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:151) at org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107) at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181) at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128) at scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /logs/checkpoint/state_96281597/0/149/257/CURRENT: No such file or directory at org.rocksdb.RocksDB.openROnly(Native Method) at org.rocksdb.RocksDB.openReadOnly(RocksDB.java:370) at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:55) ... 24 more on host A where snapshot created. [geo@plno-cto-fx2s1a l]$ grep 52786 executor.log 2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@6e8a89d9 2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(file:/checkpoint/state,0,149,default),987a396d-8b24-4426-94c7-d1c67d91496b) is active 2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - get Store for version 256 2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state into the db for 256 and partition 149 2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state for 256 and partition 149 took 0 ms. 2019-10-08 14:30:13.840Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.OptimisticTransactionDbInstance - Creating Checkpoint at /logs/checkpoint/state_96281597/0/149/257 took 444 ms. ## on spark state directory [geo@plno-cto-fx2s1a 149]$ tar tvf 257.snapshot -rw-r--r-- 0/0 144670 2019-10-08 09:30 001183.sst -rw-r--r-- 0/0 5602748 2019-10-08 09:30 001192.sst -rw-r--r-- 0/0 5223 2019-10-08 09:30 OPTIONS-001191 -rw-r--r-- 0/0 29864343 2019-10-08 09:30 001182.sst -rw-r--r-- 0/0 366 2019-10-08 09:30 MANIFEST-001188 -rw-r--r-- 0/0 16 2019-10-08 09:30 CURRENT --- update on No.3 it's turns out that the unsucessful using snapshot is because multiple tasks at same time try to get same file from spark checkpoint to same local directory, then race condication happen. such as following case. _2019-10-08 17:05:41.110Z INFO [Executor task launch worker for task 76931] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Will download file:/checkpoint/state/0/136/364.snapshot at location /logs/tmp/state_96281597/0/136/364.tar 2019-10-08 17:05:41.684Z INFO [Executor task launch worker for task 76947] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Will download file:/checkpoint/state/0/136/364.snapshot at location /logs/tmp/state_96281597/0/136/364.tar 2019-10-08 17:05:41.916Z INFO [Executor task launch worker for task 76949] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Will download file:/checkpoint/state/0/136/364.snapshot at location /logs/tmp/state_96281597/0/136/364.tar_
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
