Hi All

I'm trying to move from MapWithState to Structured Streaming v2.2.1, but I've 
run into a problem. 

To convert from Kafka data with a binary (protobuf) value to SQL I'm taking the 
dataset from readStream and doing 

Dataset<Row> s = dataset.selectExpr("timestamp", "CAST(key as string)", 
"ETBinnedDeserialize(value) AS message");

ETBinnedDeserialize is a UDF

spark.udf().register("ETBinnedDeserialize",
        (UDF1<byte[], Object>) ETProtobufDecoder::deserialize, 
Encoders.bean(BinnedForET.class).schema());

ETProtobufDecoder::deserialize looks like this

public static Object deserialize(byte[] bytes) {
    ExpressionEncoder<BinnedForET> expressionEncoder = 
(ExpressionEncoder<BinnedForET>) Encoders.bean(BinnedForET.class);
    BinnedForET binned = .... // Convert binary to pojo
    InternalRow row = expressionEncoder.toRow(binned);
    return row;
}

The key point from all this is that the schema for message is from 
Encoders.bean(BinnedForET.class) and the object the UDF returns is the result 
of the same Encoders toRow method.
Yet there is a scala mismatch. So if not toRow what should I be calling?

Here is the error 
org.apache.spark.SparkException: Failed to execute user defined 
function($anonfun$27: (binary) => 
struct<day:int,hour:int,measCount:int,minute:int,month:int,servingSectorHandle:int,year:int>)
....
Caused by: scala.MatchError: [0,16,c,0,0,1,576c,7e2] (of class 
org.apache.spark.sql.catalyst.expressions.UnsafeRow)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)

I've reduced the class to 7 ints, to make it as simple as possible. So no 
Strings that make the scala struct more complicated. 

The scala object above seems to be a struct with an initial zero (a null 
indicator?) followed by the 7 ints I expect in hex, but doesn't match? Maybe 
it's obviously wrong to a scala programmer?

Any ideas what I should be calling instead of (or after?) toRow to return the 
right thing?

Cheers
Iain 

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer 
<https://www.amdocs.com/about/email-disclaimer>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to