PingHao edited a comment on issue #24922: [SPARK-28120][SS]  Rocksdb state 
storage implementation
URL: https://github.com/apache/spark/pull/24922#issuecomment-539565660
 
 
   I had repackaging this and plug it into a spark 2.4.3 system. 
(https://github.com/PingHao/spark-statestore-rocksdb). here is my observation
   1. we using flatMapGroupsWithState, it cause it fail at begining, 
   
   2019-10-07 15:19:57.968Z ERROR [Executor task launch worker for task 65] 
org.apache.spark.util.Utils - Aborting task
   java.lang.IllegalArgumentException: requirement failed: Cannot getRange 
after already committed or aborted
           at scala.Predef$.require(Predef.scala:281)
           at 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:149)
           at 
org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107)
           at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181)
           at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128)
           at 
scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248)
           at 
scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194)
           at 
scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225)
           at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
           at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:121)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:834)
           
   I fixed this by change RocksDbStateStoreProvider.scala
   override def getRange(
           start: Option[UnsafeRow],
           end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = {
         require  (**_state == UPDATING || state == LOADED_**,
           s"Cannot getRange after already committed or aborted: state is 
$state")
         iterator()
       }
   2. Rocksdb checkpoint creating had a quite high time cost, sometimes > 20 
secs, it's become the most delay of many spark tasks. then I checked my three 
nodes spark worker, it turned out the data partition is one of on devicemapper, 
another two on xfs, then I changed all of them to a ext4 partition, the result 
is much better, it's now could be < 10ms for most case, but still sometimes 
could be > 100ms. 
   
   3. All spark executors stucks when one of executor try to load snapshot file 
from spark checkpoint. Note that this is a 3 node system, 56 CPU cores, 167 
partitions. and spark checkpoint reside on a shared NFS partition (i know it's 
better to be HDFS)
   my theroy is when the downloading executor try to download the snapshot, it 
happen almost same time as another executor try to write the snapshot in same 
time. so it get the incompleted file and throw out result. I'm thinking let's 
it retry when this happen, all make task fail and let spark schedule rerun the 
task somehow.
   
   
   on host C try to download 
   2019-10-08 14:30:38.756Z INFO [Executor task launch worker for task 53012] 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider
 - Will download file:/checkpoint/state/0/149/257.snapshot at location 
/logs/tmp/state_96281597/0/149/257.tar
   2019-10-08 14:30:39.375Z INFO [Executor task launch worker for task 53012] 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider
 - Loading state
   for 257 and partition 149 took 626 ms.
   2019-10-08 14:30:39.386Z ERROR [Executor task launch worker for task 53012] 
org.apache.spark.util.Utils - Aborting task
   java.lang.IllegalStateException: Error while creating rocksDb instance While 
opening a file for sequentially reading: 
/logs/checkpoint/state_96281597/0/149/257/CURRENT: No
   such file or directory
           at 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:65)
           at 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.iterator(RocksDbStateStoreProvider.scala:230)
           at 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:151)
           at 
org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107)
           at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181)
           at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128)
           at 
scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248)
           at 
scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194)
           at 
scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225)
           at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
           at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:121)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: org.rocksdb.RocksDBException: While opening a file for 
sequentially reading: /logs/checkpoint/state_96281597/0/149/257/CURRENT: No 
such file or directory
           at org.rocksdb.RocksDB.openROnly(Native Method)
           at org.rocksdb.RocksDB.openReadOnly(RocksDB.java:370)
           at 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:55)
           ... 24 more
   
   on host A where snapshot created.
   
   [geo@plno-cto-fx2s1a l]$ grep 52786 executor.log
   2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.StateStore - Retrieved reference 
to StateStoreCoordinator: 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@6e8a89d9
   2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.StateStore - Reported that the 
loaded instance 
StateStoreProviderId(StateStoreId(file:/checkpoint/state,0,149,default),987a396d-8b24-4426-94c7-d1c67d91496b)
 is active
   2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider
 - get Store for version 256
   2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider
 - Loading state into the db for 256 and partition 149
   2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider
 - Loading state for 256 and partition 149 took 0 ms.
   2019-10-08 14:30:13.840Z INFO [Executor task launch worker for task 52786] 
org.apache.spark.sql.execution.streaming.state.rocksdb.OptimisticTransactionDbInstance
 - Creating Checkpoint at /logs/checkpoint/state_96281597/0/149/257 took 444 ms.
   
   ## on spark state directory
   [geo@plno-cto-fx2s1a 149]$ tar tvf 257.snapshot
   -rw-r--r-- 0/0          144670 2019-10-08 09:30 001183.sst
   -rw-r--r-- 0/0         5602748 2019-10-08 09:30 001192.sst
   -rw-r--r-- 0/0            5223 2019-10-08 09:30 OPTIONS-001191
   -rw-r--r-- 0/0        29864343 2019-10-08 09:30 001182.sst
   -rw-r--r-- 0/0             366 2019-10-08 09:30 MANIFEST-001188
   -rw-r--r-- 0/0              16 2019-10-08 09:30 CURRENT
   
   --- update on No.3
   No.3 fixed after change getRange to this 
   def getRange(
           start: Option[UnsafeRow],
           end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = {
         initTransaction()  // <----- change here
         require(state == UPDATING, s"Cannot getRange after already committed 
or aborted: state is $state")
         iterator()
       }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to