HeartSaVioR commented on code in PR #41099: URL: https://github.com/apache/spark/pull/41099#discussion_r1205088664
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, Review Comment: nit: style for two lines are not very clear, but longer lines for better fit to style is OK. ``` class StateStoreChangelogWriter( fm: CheckpointFileManager, file: Path, compressionCode: CompressionCode) extends Logging { ``` ########## docs/structured-streaming-programming-guide.md: ########## @@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node. Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings. +##### RocksDB State Store Changelog Checkpointing Review Comment: Can we provide higher-level of description how this works? We even don't explain what does changelog means. I understand we have no explanation for changelog checkpointing for HDFS backed state store provider which is unfortunate, but for RocksDB state store provider, users have to make a decision whether to use old one (incremental checkpointing) or new one, which requires them to understand the characteristics of two options before choosing one. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala: ########## @@ -243,6 +243,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } override def createTempFile(path: Path): FSDataOutputStream = { + fs.create(path, true, 4096) Review Comment: looks to be accidentally added? ########## docs/structured-streaming-programming-guide.md: ########## @@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node. Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings. +##### RocksDB State Store Changelog Checkpointing +Changelog checkpointing reduces latency of the stateful streaming query. This checkpointing mechanism avoids cost of capturing and uploading snapshots of RocksDB instances in the commit phase of RocksDB state store. +You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`. +Changelog checkpointing is backward compatible. In a version of spark that supports changelog checkpointing, you can turn on changelog checkpointing for a streaming query without discarding the existing checkpoint. Review Comment: Shall we describe this based on RocksDB state store provider's point of view? Something along the line below: > RocksDB state store provider supports transition of checkpoint format smoothly. You can switch from traditional incremental checkpointing to changelog checkpointing, and vice versa. That said, you can switch to changelog checkpointing for existing streaming query made in older Spark versions. I'm not a native of English so someone may want to help refining the sentence. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + val compressed = compressionCodec.compressedOutputStream(outputStream) + new DataOutputStream(compressed) + } + + private var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + private var compressedStream: DataOutputStream = compressStream(backingFileStream) + var size = 0 + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + compressedStream.writeInt(value.size) + compressedStream.write(value) + size += 1 + } + + def delete(key: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + // -1 in the value field means record deletion. + compressedStream.writeInt(-1) + size += 1 + } + + def abort(): Unit = { + try { + if (backingFileStream != null) backingFileStream.cancel() + if (compressedStream != null) IOUtils.closeQuietly(compressedStream) + } catch { + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. + case e: FSError if e.getCause.isInstanceOf[IOException] => + case NonFatal(ex) => + logInfo(s"Failed to cancel changelog file $file for state store provider " + + s"with exception=$ex") + } finally { + backingFileStream = null + compressedStream = null + } + } + + def commit(): Unit = { + try { + // -1 in the key length field mean EOF. + compressedStream.writeInt(-1) + compressedStream.close() + } catch { + case e: Throwable => + abort() + logError(s"Fail to commit changelog file $file because of exception $e") + throw e + } finally { + backingFileStream = null + compressedStream = null + } + } +} + + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte]) + * A put record is returned as a ByteArrayPair(key, value) + * A delete record is return as a ByteArrayPair(key, null) + */ +class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path, + compressionCodec: CompressionCodec) + extends Iterator[ByteArrayPair] with Logging { + + private def decompressStream(inputStream: DataInputStream): DataInputStream = { + val compressed = compressionCodec.compressedInputStream(inputStream) + new DataInputStream(compressed) + } + + private val sourceStream = try { + fm.open(fileToRead) + } catch { + case f: FileNotFoundException => + throw new IllegalStateException( + s"Error reading streaming state file of $this: $fileToRead does not exist. " + Review Comment: This class is not a case class. Which information you'd want to put in `$this`? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + val compressed = compressionCodec.compressedOutputStream(outputStream) + new DataOutputStream(compressed) + } + + private var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + private var compressedStream: DataOutputStream = compressStream(backingFileStream) + var size = 0 + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + compressedStream.writeInt(value.size) + compressedStream.write(value) + size += 1 + } + + def delete(key: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + // -1 in the value field means record deletion. + compressedStream.writeInt(-1) + size += 1 + } + + def abort(): Unit = { + try { + if (backingFileStream != null) backingFileStream.cancel() + if (compressedStream != null) IOUtils.closeQuietly(compressedStream) + } catch { + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. + case e: FSError if e.getCause.isInstanceOf[IOException] => + case NonFatal(ex) => + logInfo(s"Failed to cancel changelog file $file for state store provider " + + s"with exception=$ex") + } finally { + backingFileStream = null + compressedStream = null + } + } + + def commit(): Unit = { + try { + // -1 in the key length field mean EOF. + compressedStream.writeInt(-1) + compressedStream.close() + } catch { + case e: Throwable => + abort() + logError(s"Fail to commit changelog file $file because of exception $e") + throw e + } finally { + backingFileStream = null + compressedStream = null + } + } +} + + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte]) + * A put record is returned as a ByteArrayPair(key, value) + * A delete record is return as a ByteArrayPair(key, null) + */ +class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path, + compressionCodec: CompressionCodec) + extends Iterator[ByteArrayPair] with Logging { + + private def decompressStream(inputStream: DataInputStream): DataInputStream = { + val compressed = compressionCodec.compressedInputStream(inputStream) + new DataInputStream(compressed) + } + + private val sourceStream = try { + fm.open(fileToRead) + } catch { + case f: FileNotFoundException => + throw new IllegalStateException( + s"Error reading streaming state file of $this: $fileToRead does not exist. " + + s"If the stream job is restarted with a new or updated state operation, please" + + s" create a new checkpoint location or clear the existing checkpoint location.", f) + } + private val input: DataInputStream = decompressStream(sourceStream) + // A buffer that hold the next record to return. + private var byteArrayPair: ByteArrayPair = null + private var eof = false + + override def hasNext: Boolean = { + maybeReadNext() + byteArrayPair != null + } + + override def next(): ByteArrayPair = { + maybeReadNext() + val nextByteArrayPair = byteArrayPair + byteArrayPair = null + nextByteArrayPair + } + + def close(): Unit = { if (input != null) input.close() } + + private def maybeReadNext(): Unit = { + if (!eof && byteArrayPair == null) { + val keySize = input.readInt() + // A -1 key size mean end of file. + if (keySize == -1) { + eof = true + } else if (keySize < 0) { + throw new IOException( + s"Error reading streaming state file $fileToRead of $this: key size cannot be $keySize") Review Comment: Same: what would be the actual print of $this? ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, RocksDBStateStoreProvider} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +trait RocksDBStateStoreTest extends SQLTestUtils { + override protected def test(testName: String, testTags: Tag*)(testBody: => Any) + (implicit pos: Position): Unit = { + super.test(testName + " (RocksDBStateStore)", testTags: _*) { + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + // in case tests have any code that needs to execute after every test + super.afterEach() + } + + super.test(testName + " (RocksDBStateStore with changelog checkpointing)", testTags: _*) { + // in case tests have any code that needs to execute before every test + super.beforeEach() + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + } + } + Review Comment: nit: extra empty line ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala: ########## @@ -1101,3 +1101,7 @@ object FlatMapGroupsWithStateSuite { throw new TestFailedException("Could get watermark when not expected", 20) } } +/* Review Comment: I guess this is temporary and we uncomment this before merging. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala: ########## @@ -956,3 +956,57 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } } } + +class RocksDBStateStoreStreamingAggregationSuite + extends StreamingAggregationSuite with RocksDBStateStoreTest { + import testImplicits._ + def snapshotVersionsPresent(dir: File): Seq[Long] = { + dir.listFiles.filter(_.getName.endsWith(".zip")) + .map(_.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + } + def changelogVersionsPresent(dir: File): Seq[Long] = { Review Comment: nit: empty line between two methods. apply to all code changes. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala: ########## @@ -956,3 +956,57 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } } } + +class RocksDBStateStoreStreamingAggregationSuite + extends StreamingAggregationSuite with RocksDBStateStoreTest { + import testImplicits._ + def snapshotVersionsPresent(dir: File): Seq[Long] = { + dir.listFiles.filter(_.getName.endsWith(".zip")) + .map(_.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + } + def changelogVersionsPresent(dir: File): Seq[Long] = { + dir.listFiles.filter(_.getName.endsWith(".changelog")) + .map(_.getName.stripSuffix(".changelog")) + .map(_.toLong) + .sorted + } + test("Streaming aggregation RocksDB State Store backward compatibility.") { + val checkpointDir = Utils.createTempDir().getCanonicalFile + checkpointDir.delete() + + val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0") Review Comment: nit: `dirForPartition0`, to be super clear ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -279,7 +280,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val timeoutDuration = 1.minute quietly { - withSpark(new SparkContext(conf)) { sc => + withSpark(SparkContext.getOrCreate(conf)) { sc => Review Comment: 1. SparkContext.getOrCreate() does not allow you to set the config if there is active SparkContext. If there is no active SparkContext then config takes effect. Hopefully we don't seem to set any configs so we don't mess up anything, but worth noting. 2. If you stop active SparkContext (withSpark), SparkSession containing SparkContext will also be merely not functioning till you set the SparkContext again. That said, if you have a test which intends to spin a new SparkContext, it (test suite) should not extend shared SparkSession. There is another trait named LocalSparkContext which spins up and tears down a new SparkContext per test, but this may be also overkill for this test suite because only a few tests require SparkContext instance to be set. I'd say let's roll back unless your change requires SparkSession to be set. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ########## @@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite { } } + private def sqlConf = SQLConf.get Review Comment: I don't feel like this is a good idea in terms of isolation. The reason we do withSQLConf is simple - we don't want to leave the modified config to be applied for remaining tests. Unlike the intention of using def, this will give the same config instance across multiple calls, because SparkSession is shared among test in this suite, and any modification will affect all remaining tests. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala: ########## @@ -280,34 +342,34 @@ class RocksDBFileManager( val path = new Path(dfsRootDir) // All versions present in DFS, sorted - val sortedVersions = fm.list(path, onlyZipFiles) + val sortedSnapshotVersions = fm.list(path, onlyZipFiles) .map(_.getPath.getName.stripSuffix(".zip")) .map(_.toLong) .sorted // Return if no versions generated yet - if (sortedVersions.isEmpty) return + if (sortedSnapshotVersions.isEmpty) return // Find the versions to delete - val maxVersionPresent = sortedVersions.last - val minVersionPresent = sortedVersions.head - val minVersionToRetain = - math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1) - val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long] + val maxSnapshotVersionPresent = sortedSnapshotVersions.last + val minSnapshotVersionPresent = sortedSnapshotVersions.head + + // In order to reconstruct numVersionsToRetain version, retain the latest snapshot + // that satisfies (version <= maxSnapshotVersionPresent - numVersionsToRetain + 1) + val minVersionToRetain = sortedSnapshotVersions Review Comment: Sounds good! ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -125,6 +132,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid put(store, "a", 0, 1) assert(get(store, "a", 0) === Some(1)) assert(store.commit() === 1) + provider.doMaintenance() Review Comment: This only ensures custom metrics, and other metrics like number of keys are available without explicit maintenance, right? Just to double check. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + val compressed = compressionCodec.compressedOutputStream(outputStream) + new DataOutputStream(compressed) + } + + private var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + private var compressedStream: DataOutputStream = compressStream(backingFileStream) + var size = 0 + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + compressedStream.writeInt(value.size) + compressedStream.write(value) + size += 1 + } + + def delete(key: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + // -1 in the value field means record deletion. + compressedStream.writeInt(-1) + size += 1 + } + + def abort(): Unit = { + try { + if (backingFileStream != null) backingFileStream.cancel() + if (compressedStream != null) IOUtils.closeQuietly(compressedStream) + } catch { + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. + case e: FSError if e.getCause.isInstanceOf[IOException] => + case NonFatal(ex) => + logInfo(s"Failed to cancel changelog file $file for state store provider " + + s"with exception=$ex") + } finally { + backingFileStream = null + compressedStream = null + } + } + + def commit(): Unit = { + try { + // -1 in the key length field mean EOF. + compressedStream.writeInt(-1) + compressedStream.close() + } catch { + case e: Throwable => + abort() + logError(s"Fail to commit changelog file $file because of exception $e") + throw e + } finally { + backingFileStream = null + compressedStream = null + } + } +} + + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte]) + * A put record is returned as a ByteArrayPair(key, value) + * A delete record is return as a ByteArrayPair(key, null) + */ +class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path, Review Comment: nit: same here for style ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + val compressed = compressionCodec.compressedOutputStream(outputStream) + new DataOutputStream(compressed) + } + + private var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + private var compressedStream: DataOutputStream = compressStream(backingFileStream) + var size = 0 + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + compressedStream.writeInt(value.size) + compressedStream.write(value) + size += 1 + } + + def delete(key: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + // -1 in the value field means record deletion. + compressedStream.writeInt(-1) + size += 1 + } + + def abort(): Unit = { + try { + if (backingFileStream != null) backingFileStream.cancel() + if (compressedStream != null) IOUtils.closeQuietly(compressedStream) + } catch { + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. + case e: FSError if e.getCause.isInstanceOf[IOException] => + case NonFatal(ex) => + logInfo(s"Failed to cancel changelog file $file for state store provider " + + s"with exception=$ex") + } finally { + backingFileStream = null + compressedStream = null + } + } + + def commit(): Unit = { + try { + // -1 in the key length field mean EOF. + compressedStream.writeInt(-1) + compressedStream.close() + } catch { + case e: Throwable => + abort() + logError(s"Fail to commit changelog file $file because of exception $e") + throw e + } finally { + backingFileStream = null + compressedStream = null + } + } +} + + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte]) + * A put record is returned as a ByteArrayPair(key, value) + * A delete record is return as a ByteArrayPair(key, null) + */ +class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path, + compressionCodec: CompressionCodec) + extends Iterator[ByteArrayPair] with Logging { Review Comment: Have you indicated that there is a helper class NextIterator in Spark which does the thing you are implementing here? Just to check. Please ignore if you already went through and found a blocker. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala: ########## @@ -1101,3 +1101,7 @@ object FlatMapGroupsWithStateSuite { throw new TestFailedException("Could get watermark when not expected", 20) } } +/* +class RocksDBStateStoreFlatMapGroupsWithStateSuite + extends FlatMapGroupsWithStateSuite with RocksDBStateStoreTest {} Review Comment: nit: {} is not necessary for class with no body implementation ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} + +import scala.util.control.NonFatal + +import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FSError, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + */ +class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + val compressed = compressionCodec.compressedOutputStream(outputStream) + new DataOutputStream(compressed) + } + + private var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + private var compressedStream: DataOutputStream = compressStream(backingFileStream) + var size = 0 + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + compressedStream.writeInt(value.size) + compressedStream.write(value) + size += 1 + } + + def delete(key: Array[Byte]): Unit = { + assert(compressedStream != null) + compressedStream.writeInt(key.size) + compressedStream.write(key) + // -1 in the value field means record deletion. + compressedStream.writeInt(-1) + size += 1 + } + + def abort(): Unit = { + try { + if (backingFileStream != null) backingFileStream.cancel() + if (compressedStream != null) IOUtils.closeQuietly(compressedStream) + } catch { + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. + case e: FSError if e.getCause.isInstanceOf[IOException] => + case NonFatal(ex) => + logInfo(s"Failed to cancel changelog file $file for state store provider " + + s"with exception=$ex") + } finally { + backingFileStream = null + compressedStream = null + } + } + + def commit(): Unit = { + try { + // -1 in the key length field mean EOF. + compressedStream.writeInt(-1) + compressedStream.close() + } catch { + case e: Throwable => + abort() + logError(s"Fail to commit changelog file $file because of exception $e") + throw e + } finally { + backingFileStream = null + compressedStream = null + } + } +} + + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte]) + * A put record is returned as a ByteArrayPair(key, value) + * A delete record is return as a ByteArrayPair(key, null) + */ +class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path, + compressionCodec: CompressionCodec) + extends Iterator[ByteArrayPair] with Logging { + + private def decompressStream(inputStream: DataInputStream): DataInputStream = { + val compressed = compressionCodec.compressedInputStream(inputStream) + new DataInputStream(compressed) + } + + private val sourceStream = try { + fm.open(fileToRead) + } catch { + case f: FileNotFoundException => + throw new IllegalStateException( + s"Error reading streaming state file of $this: $fileToRead does not exist. " + + s"If the stream job is restarted with a new or updated state operation, please" + Review Comment: nit: s isn't needed, same as below ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, RocksDBStateStoreProvider} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +trait RocksDBStateStoreTest extends SQLTestUtils { + override protected def test(testName: String, testTags: Tag*)(testBody: => Any) + (implicit pos: Position): Unit = { + super.test(testName + " (RocksDBStateStore)", testTags: _*) { + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + // in case tests have any code that needs to execute after every test + super.afterEach() + } + + super.test(testName + " (RocksDBStateStore with changelog checkpointing)", testTags: _*) { + // in case tests have any code that needs to execute before every test + super.beforeEach() + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + } + } + + + def rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + Review Comment: nit: val ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ########## @@ -24,14 +24,275 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.scalactic.source.Position +import org.scalatest.Tag import org.apache.spark._ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.{ThreadUtils, Utils} -class RocksDBSuite extends SparkFunSuite { + +trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils { + override protected def test(testName: String, testTags: Tag*)(testBody: => Any) + (implicit pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + // in case tests have any code that needs to execute after every test + super.afterEach() + } + + super.test(testName + " (with changelog checkpointing)", testTags: _*) { + // in case tests have any code that needs to execute before every test + super.beforeEach() + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + } + } + + def rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + Review Comment: nit: val, as it never changes. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ########## @@ -483,3 +483,6 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { ) } } + +class RocksDBStateStoreStreamingDeduplicationSuite + extends StreamingDeduplicationSuite with RocksDBStateStoreTest {} Review Comment: ditto ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala: ########## @@ -956,3 +956,57 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } } } + +class RocksDBStateStoreStreamingAggregationSuite + extends StreamingAggregationSuite with RocksDBStateStoreTest { + import testImplicits._ + def snapshotVersionsPresent(dir: File): Seq[Long] = { + dir.listFiles.filter(_.getName.endsWith(".zip")) + .map(_.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + } + def changelogVersionsPresent(dir: File): Seq[Long] = { + dir.listFiles.filter(_.getName.endsWith(".changelog")) + .map(_.getName.stripSuffix(".changelog")) + .map(_.toLong) + .sorted + } + test("Streaming aggregation RocksDB State Store backward compatibility.") { + val checkpointDir = Utils.createTempDir().getCanonicalFile + checkpointDir.delete() + + val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0") + val inputData = MemoryStream[Int] + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + // Run the stream with changelog checkpointing disabled. + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "false")), + AddData(inputData, 3), + CheckLastBatch((3, 1)), + AddData(inputData, 3, 2), + CheckLastBatch((3, 2), (2, 1)), + StopStream + ) + assert(changelogVersionsPresent(rocksDBStateDir).isEmpty) + assert(snapshotVersionsPresent(rocksDBStateDir) == List(1L, 2L)) + + // Run the stream with changelog checkpointing enabled. + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "true")), + AddData(inputData, 3, 2, 1), + CheckLastBatch((3, 3), (2, 2), (1, 1)), + // By default we run in new tuple mode. + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch((4, 4)) + ) + assert(changelogVersionsPresent(rocksDBStateDir) == List(3L, 4L)) Review Comment: Let's flip the config once again to check bi-directional migration. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ########## @@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite { } } + private def sqlConf = SQLConf.get + + private def dbConf = RocksDBConf(StateStoreConf(sqlConf)) Review Comment: Same here, doesn't seem to be good in terms of isolation. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala: ########## @@ -134,6 +137,27 @@ class RocksDBFileManager( private val onlyZipFiles = new PathFilter { override def accept(path: Path): Boolean = path.toString.endsWith(".zip") } + private val onlyDeltaFiles = new PathFilter { + override def accept(path: Path): Boolean = path.toString.endsWith(".changelog") + } + + private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) + + private def codec = CompressionCodec.createCodec(sparkConf, codecName) Review Comment: Thanks for explanation. OK as it is. Not a big deal. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ########## @@ -24,14 +24,275 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.scalactic.source.Position +import org.scalatest.Tag import org.apache.spark._ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.{ThreadUtils, Utils} -class RocksDBSuite extends SparkFunSuite { + +trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils { + override protected def test(testName: String, testTags: Tag*)(testBody: => Any) + (implicit pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + // in case tests have any code that needs to execute after every test + super.afterEach() + } + + super.test(testName + " (with changelog checkpointing)", testTags: _*) { + // in case tests have any code that needs to execute before every test + super.beforeEach() + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + } + } + + def rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + Review Comment: While we are here, let's put this in top so that any reference will be placed after this is defined. Compiler wouldn't complain for forward reference, but would be helpful for readers. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -279,7 +280,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val timeoutDuration = 1.minute quietly { - withSpark(new SparkContext(conf)) { sc => + withSpark(SparkContext.getOrCreate(conf)) { sc => Review Comment: I feel like you had to do this because withSQLConf may require to have SparkSession. But I suspect this test won't use SQLConf being initialized in shared SparkSession, which makes withSQLConf be no-op. Have you checked that all tests are running intentionally, testing both incremental checkpointing and changelog checkpointing? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ########## @@ -24,14 +24,275 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.scalactic.source.Position +import org.scalatest.Tag import org.apache.spark._ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.{ThreadUtils, Utils} -class RocksDBSuite extends SparkFunSuite { + +trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils { + override protected def test(testName: String, testTags: Tag*)(testBody: => Any) + (implicit pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + // in case tests have any code that needs to execute after every test + super.afterEach() + } + + super.test(testName + " (with changelog checkpointing)", testTags: _*) { + // in case tests have any code that needs to execute before every test + super.beforeEach() + withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + testBody + } + } + } + + def rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + Review Comment: If you want to use this as constant, let's follow the standard. Define it as val with all uppercases with underline as separator, and preferably, put them in companion object. Probably doesn't need to apply latter as it's a trait. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
