Hello, Am using FlatMapGroupsWithStateFunction in my spark streaming application. FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate> idstateUpdateFunction = new FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate>() {.....}
SessionUpdate class is having trouble when added the highlighted code which throws below exception; The same attribute milestones with setter/getter has been added to SessionInfo (input class) but it does not throw exception there. public static class SessionUpdate implements Serializable { private static final long serialVersionUID = -3858977319192658483L; *private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();* private Timestamp processingTimeoutTimestamp; public SessionUpdate() { super(); } public SessionUpdate(String instanceId, *ArrayList<GenericRowWithSchema> milestones*, Timestamp processingTimeoutTimestamp) { super(); this.instanceId = instanceId; *this.milestones = milestones;* this.processingTimeoutTimestamp = processingTimeoutTimestamp; } public String getInstanceId() { return instanceId; } public void setInstanceId(String instanceId) { this.instanceId = instanceId; } *public ArrayList<GenericRowWithSchema> getMilestones() {* * return milestones;* *}* *public void setMilestones(ArrayList<GenericRowWithSchema> milestones) {* * this.milestones = milestones;* *}* public Timestamp getProcessingTimeoutTimestamp() { return processingTimeoutTimestamp; } public void setProcessingTimeoutTimestamp(Timestamp processingTimeoutTimestamp) { this.processingTimeoutTimestamp = processingTimeoutTimestamp; } } Exception: ERROR cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;; 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getInstanceId, true, false) AS instanceId#62, mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), if (isnull(lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), true))) null else named_struct(), assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getMilestones, None) AS milestones#63, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getProcessingTimeoutTimestamp, true, false) AS processingTimeoutTimestamp#64] +- FlatMapGroupsWithState <function3>, cast(value#54 as string).toString, createexternalrow(EventTime#23.toString, InstanceID#24.toString, Model#25.toString, Milestone#26.toString, Region#27.toString, SalesOrganization#28.toString, ProductName#29.toString, ReasonForQuoteReject#30.toString, ReasonforRejectionBy#31.toString, OpportunityAmount#32.toJavaBigDecimal, Discount#33.toJavaBigDecimal, TotalQuoteAmount#34.toJavaBigDecimal, NetQuoteAmount#35.toJavaBigDecimal, ApprovedDiscount#36.toJavaBigDecimal, TotalOrderAmount#37.toJavaBigDecimal, StructField(EventTime,StringType,true), StructField(InstanceID,StringType,true), StructField(Model,StringType,true), StructField(Milestone,StringType,true), StructField(Region,StringType,true), StructField(SalesOrganization,StringType,true), StructField(ProductName,StringType,true), StructField(ReasonForQuoteReject,StringType,true), StructField(ReasonforRejectionBy,StringType,true), ... 6 more fields), [value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26, Region#27, SalesOrganization#28, ProductName#29, ReasonForQuoteReject#30, ReasonforRejectionBy#31, OpportunityAmount#32, Discount#33, TotalQuoteAmount#34, NetQuoteAmount#35, ApprovedDiscount#36, TotalOrderAmount#37], obj#61: oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, class[instanceId[0]: string, milestones[0]: array<struct<>>, processingTimeoutTimestamp[0]: timestamp], Append, false, ProcessingTimeTimeout Schema looks like {"Name":"EventTime", "DataType":"TimestampType"}, {"Name":"InstanceID", "DataType":"STRING", "Length":100}, {"Name":"Model", "DataType":"STRING", "Length":100}, {"Name":"Milestone", "DataType":"STRING", "Length":100}, {"Name":"Region", "DataType":"STRING", "Length":100}, {"Name":"SalesOrganization", "DataType":"STRING", "Length":100}, {"Name":"ProductName", "DataType":"STRING", "Length":100}, {"Name":"ReasonForQuoteReject", "DataType":"STRING", "Length":100}, {"Name":"ReasonforRejectionBy", "DataType":"STRING", "Length":100}, //Note: org.apache.spark.sql.types.DataTypes.createDecimalType(precision(), scale()) {"Name":"OpportunityAmount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"Discount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"TotalQuoteAmount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"NetQuoteAmount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"ApprovedDiscount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"TotalOrderAmount", "DataType":"DECIMAL", "Precision":38,"Scale":2} Please let me know how to debug what is wrong in this use case? thanks. Robin Kuttaiah