anishshri-db commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1817397479
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -143,23 +158,118 @@ class StateTypesEncoder[V](
}
}
-object StateTypesEncoder {
+
+object UnsafeRowTypesEncoder {
def apply[V](
keyEncoder: ExpressionEncoder[Any],
valEncoder: Encoder[V],
stateName: String,
- hasTtl: Boolean = false): StateTypesEncoder[V] = {
- new StateTypesEncoder[V](keyEncoder, valEncoder, stateName, hasTtl)
+ hasTtl: Boolean = false): UnsafeRowTypesEncoder[V] = {
+ new UnsafeRowTypesEncoder[V](keyEncoder, valEncoder, stateName, hasTtl)
+ }
+}
+
+/**
+ * Helper class providing APIs to encode the grouping key, and user provided
values
+ * to Spark [[UnsafeRow]].
+ *
+ * CAUTION: StateTypesEncoder class instance is *not* thread-safe.
+ * This class reuses the keyProjection and valueProjection for encoding
grouping
+ * key and state value respectively. As UnsafeProjection is not thread safe,
this
+ * class is also not thread safe.
+ *
+ * @param keyEncoder - SQL encoder for the grouping key, key type is implicit
+ * @param valEncoder - SQL encoder for value of type `S`
+ * @param stateName - name of logical state partition
+ * @tparam V - value type
+ */
+class AvroTypesEncoder[V](
+ keyEncoder: ExpressionEncoder[Any],
+ valEncoder: Encoder[V],
+ stateName: String,
+ hasTtl: Boolean,
+ avroSerde: Option[AvroEncoderSpec]) extends StateTypesEncoder[V,
Array[Byte]] {
+
+ val out = new ByteArrayOutputStream
+
+ /** Variables reused for value conversions between spark sql and object */
+ private val keySerializer = keyEncoder.createSerializer()
+ private val valExpressionEnc = encoderFor(valEncoder)
+ private val objToRowSerializer = valExpressionEnc.createSerializer()
+ private val rowToObjDeserializer =
valExpressionEnc.resolveAndBind().createDeserializer()
+
+ private val keySchema = keyEncoder.schema
+ private val keyAvroType = SchemaConverters.toAvroType(keySchema)
+
+ // case class -> dataType
+ private val valSchema: StructType = valEncoder.schema
+ // dataType -> avroType
+ private val valueAvroType = SchemaConverters.toAvroType(valSchema)
+
+ override def encodeGroupingKey(): Array[Byte] = {
+ val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
Review Comment:
lets assert in these functions for avro serde to be available
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]