Hello all, I am working on a custom sink implementation, but having weird issues with checkpointing.
I am using a custom ListState to checkpoint, and it looks like this: private var checkpointMessages: ListState[Bucket] =_ My snapshot function looks like: @throws[IOException] def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointMessages.clear() for((bucketName, bucket) <- bufferedMessages) { // cloning to avoid any conncurrent modification issues var new_buffer = new ListBuffer[GenericRecord]() bucket.buffer.foreach(f=> new_buffer += f) val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp) if(shouldUpload(bucketName)) uploadFile (bucketName) else checkpointMessages.add(new_bucket) }} where class bucket is: @SerialVersionUID(1L) class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{ def this(name: String) = { this(name, ListBuffer[GenericRecord](), new Date().getTime) } } BufferredMessages signature is private val bufferedMessages = collection.mutable.Map[String, Bucket]() The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3. I try to run this both locally in intellij and on a cluster: On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below: # A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903 # # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops) # Problematic frame: # V [libjvm.dylib+0x46440c] # # Core dump written. Default location: /cores/core or core.25232 # # An error report file with more information is saved as: # hs_err_pid25232.log # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # The crash happened outside the Java Virtual Machine in native code. # See problematic frame for where to report the bug. # Disconnected from the target VM, address: '127.0.0.1:60979', transport: 'socket' Process finished with exit code 134 (interrupted by signal 6: SIGABRT) I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a. On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink. Could someone please help me/ guide me to troubleshoot this further. -- Thanking in advance, Vipul