Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r180938989 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -471,6 +470,41 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("error writing [version].delta cancels the output stream") { + + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[TestCheckpointFileManager].getName) + val remoteDir = Utils.createTempDir().getAbsolutePath + + val provider = newStoreProvider( + opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf) + + // Disable failure of output stream and generate versions + TestCheckpointFileManager.shouldFailInCreateAtomic = false + for (version <- 1 to 10) { + val store = provider.getStore(version - 1) + put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ... + store.commit() + } + val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + + val store = provider.getStore(10) + // Fail commit for next version and verify that reloading resets the files + TestCheckpointFileManager.shouldFailInCreateAtomic = true + put(store, "11", 11) + val e = intercept[IllegalStateException] { quietly { store.commit() } } + assert(e.getCause.isInstanceOf[IOException], "Was waiting the IOException to be thrown") + TestCheckpointFileManager.shouldFailInCreateAtomic = false + + // Abort commit for next version and verify that reloading resets the files + val store2 = provider.getStore(10) + put(store2, "11", 11) + store2.abort() + assert(TestCheckpointFileManager.cancelCalledInCreateAtomic) --- End diff -- can you verify that it was false before the `abort`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org