Hello All,
I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:
- we can get alerts of a particular KPI with type 'major' OR 'critical'
- for a KPI, if we get alerts of type 'major' eg _major, and we have a
critical alert as well _critical, we need to ignore the _major alert, and
consider _critical alert only
There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)
UDF Code (draft):
@udf(returnType=StringType())def convertStructToStr(APP_CAUSE,
tenantName, window,<one>,<two>__major,<three>__major,
<four>__critical, five__major, <six>__critical):
res = "{window: "+ str(window) + "type: 10m, applianceName: "+
str(APP_CAUSE)+","
first = True
for curr_alarm in AlarmKeys.alarms_all:
alsplit = curr_alarm.split('__')
if len(alsplit) == 2:
# Only account for critical row if both major & critical are there
if alsplit[1] == 'major':
critical_alarm = alsplit[0] + "__critical"
if int(col(critical_alarm)) > 0:
continue
if int(col(curr_alarm)) > 0:
if first:
mystring = "{} {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
first = False
else:
mystring = "{}, {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
res+="insight: "+mystring +"}"
# structured streaming using udf, this is printing data on console#
eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation",
"/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
.option("outputMode", "complete")\
.save("output")
Additional Details in stackoverflow :
https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t
Question is -
Can this be done using UDF ? Since I'm passing column values to the UDF, I
have no way to check if a particular KPI of type 'critical' is available in
the dataframe ?
Any suggestions on the best way to solve this problem ?
tia!