Hello all,

I am working on a custom sink implementation, but having weird issues with
checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:

private var checkpointMessages: ListState[Bucket] =_


My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
  checkpointMessages.clear()
      for((bucketName, bucket) <- bufferedMessages) {

        // cloning to avoid any conncurrent modification issues
        var new_buffer = new ListBuffer[GenericRecord]()

        bucket.buffer.foreach(f=> new_buffer += f)

        val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

        if(shouldUpload(bucketName)) uploadFile (bucketName)
        else checkpointMessages.add(new_bucket)
      }}

where class bucket is:

@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord],
var timestamp: Long) extends Serializable{
  def this(name: String) = {
    this(name, ListBuffer[GenericRecord](), new Date().getTime)
  }
}


BufferredMessages signature is

private val bufferedMessages = collection.mutable.Map[String, Bucket]()


The basic idea behind this implementation is I maintain multiple buffers,
and push messages(org.apache.avro.generic.GenericRecord) during the @invoke
section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then
error out with the exception below:


# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport:
'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump:
https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a.

On a cluster I start to set concurrent serialization issues:
https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but
i checked the number of records are around ~10k in the buffer. Due to the
nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

-- 
Thanking in advance,
Vipul

Reply via email to