Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21048#discussion_r180938989
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
    @@ -471,6 +470,41 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
         }
       }
     
    +  test("error writing [version].delta cancels the output stream") {
    +
    +    val hadoopConf = new Configuration()
    +    hadoopConf.set(
    +      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
    +      classOf[TestCheckpointFileManager].getName)
    +    val remoteDir = Utils.createTempDir().getAbsolutePath
    +
    +    val provider = newStoreProvider(
    +      opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = 
hadoopConf)
    +
    +    // Disable failure of output stream and generate versions
    +    TestCheckpointFileManager.shouldFailInCreateAtomic = false
    +    for (version <- 1 to 10) {
    +      val store = provider.getStore(version - 1)
    +      put(store, version.toString, version) // update "1" -> 1, "2" -> 2, 
...
    +      store.commit()
    +    }
    +    val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
    +
    +    val store = provider.getStore(10)
    +    // Fail commit for next version and verify that reloading resets the 
files
    +    TestCheckpointFileManager.shouldFailInCreateAtomic = true
    +    put(store, "11", 11)
    +    val e = intercept[IllegalStateException] { quietly { store.commit() } }
    +    assert(e.getCause.isInstanceOf[IOException], "Was waiting the 
IOException to be thrown")
    +    TestCheckpointFileManager.shouldFailInCreateAtomic = false
    +
    +    // Abort commit for next version and verify that reloading resets the 
files
    +    val store2 = provider.getStore(10)
    +    put(store2, "11", 11)
    +    store2.abort()
    +    assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
    --- End diff --
    
    can you verify that it was false before the `abort`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to