SAMZA-1370; Memory leak in CachedStore when using ByteBufferSerde as key Serde
ByteBufferSerde uses relative bulk get to serialize the provided ByteBuffer which changes its internal position. ByteBuffer's `equals` and `hashCode` depend upon its remaining elements, i.e. on its position. This means that when using ByteBuffers as keys in the CachedStore, flushing cache contents to the underlying store changes their hashCode. Since the hashCode for the key no longer matches the one used when inserting it into the map, the LinkedHashMap cannot correctly evict or remove these entries, leading to a memory leak. Changing ByteBufferSerde to duplicate the provided ByteBuffer before copying should fix this issue. Prefer this over using absolute gets since there's no bulk absolute get API for ByteBuffer. Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #251 from prateekm/bytebufferserde Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c27a2532 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c27a2532 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c27a2532 Branch: refs/heads/0.14.0 Commit: c27a253283bbc09e82c4e9d669bf54d1460c5299 Parents: d32e8bb Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Thu Jul 27 15:10:43 2017 -0700 Committer: Jagadish <jagad...@apache.org> Committed: Thu Jul 27 15:10:43 2017 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 8 ++++++- .../samza/serializers/ByteBufferSerde.scala | 5 ++-- .../samza/serializers/TestByteBufferSerde.scala | 24 +++++++++++++++----- 3 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 3263856..bdf477a 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1081,6 +1081,8 @@ <dl> <dt><code>org.apache.samza.serializers.ByteSerdeFactory</code></dt> <dd>A no-op serde which passes through the undecoded byte array.</dd> + <dt><code>org.apache.samza.serializers.ByteBufferSerdeFactory</code></dt> + <dd>Encodes <code>java.nio.ByteBuffer</code> objects.</dd> <dt><code>org.apache.samza.serializers.IntegerSerdeFactory</code></dt> <dd>Encodes <code>java.lang.Integer</code> objects as binary (4 bytes fixed-length big-endian encoding).</dd> <dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt> @@ -1090,7 +1092,11 @@ <dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt> <dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd> <dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt> - <dd>Encodes <code>java.lang.Double</code> as binray (8 bytes double-precision float point).</dd> + <dd>Encodes <code>java.lang.Double</code> as binary (8 bytes double-precision float point).</dd> + <dt><code>org.apache.samza.serializers.UUIDSerdeFactory</code></dt> + <dd>Encodes <code>java.util.UUID</code> objects.</dd> + <dt><code>org.apache.samza.serializers.SerializableSerdeFactory</code></dt> + <dd>Encodes <code>java.io.Serializable</code> objects.</dd> <dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt> <dd>Encodes <code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which are used for <a href="../container/metrics.html">reporting metrics</a>) as JSON.</dd> http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala index 05c3e38..adb8781 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala @@ -23,8 +23,7 @@ import org.apache.samza.config.Config import java.nio.ByteBuffer /** - * A serializer for bytes that is effectively a no-op but can be useful for - * binary messages. + * A serializer for ByteBuffers. */ class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] { def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde @@ -34,7 +33,7 @@ class ByteBufferSerde extends Serde[ByteBuffer] { def toBytes(byteBuffer: ByteBuffer) = { if (byteBuffer != null) { val bytes = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(bytes) + byteBuffer.duplicate().get(bytes) bytes } else { null http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala index 9401d70..eddfb0a 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala @@ -26,16 +26,28 @@ import java.nio.ByteBuffer class TestByteBufferSerde { @Test - def test { + def testSerde { val serde = new ByteBufferSerde assertNull(serde.toBytes(null)) assertNull(serde.fromBytes(null)) val bytes = "A lazy way of creating a byte array".getBytes() - val testBytes = ByteBuffer.wrap(bytes) - testBytes.mark() - assertArrayEquals(serde.toBytes(testBytes), bytes) - testBytes.reset() - assertEquals(serde.fromBytes(bytes), testBytes) + val byteBuffer = ByteBuffer.wrap(bytes) + byteBuffer.mark() + assertArrayEquals(serde.toBytes(byteBuffer), bytes) + byteBuffer.reset() + assertEquals(serde.fromBytes(bytes), byteBuffer) + } + + @Test + def testSerializationPreservesInput { + val serde = new ByteBufferSerde + val bytes = "A lazy way of creating a byte array".getBytes() + val byteBuffer = ByteBuffer.wrap(bytes) + byteBuffer.get() // advance position by 1 + serde.toBytes(byteBuffer) + + assertEquals(byteBuffer.capacity(), byteBuffer.limit()) + assertEquals(1, byteBuffer.position()) } } \ No newline at end of file