Hello!

What's the best way of debugging a CEP pattern/stream?
Basically, I have a Flink (Scala) program that consumes events from
RabbitMQ. The source is working ok because I can see the inputStream
printed. Then, I defined one pattern, created a patternStream, and finally
an output stream with a created PatternProcessFunction.

I want to make sure my pattern is defined correctly and the same with my
process function, because so far it seems I'm not getting any element in my
output stream and I'm manually inserting events in the source that might
cause a match. Is there a way I can debug this using IntelliJ (I set some
breakpoints but ofc doesn't work) or with any other method? I used also
some classical System.out.println with no success.
Can some tests be written and later performed to check the expected
behavior?

I'm leaving here most of my code in case you detect some structural or
conceptual error (for example, before this version I was not setting the
time characteristics for the stream so it didn't work...)

package org.angoglez
import dto.events.{HourlyReading, OutputEvent}
import dto.schemas.GenericJsonDeserializationSchema
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.scala._
import org.apache.flink.cep.scala.CEP
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.rabbitmq.common.
RMQConnectionConfig
import patterns.WaterPatterns
import patterns.functions.HourlyReadingProcessFunction
import source.RabbitMQSource

import java.time.{Duration, Instant, ZoneId, ZoneOffset}

object MainRunner {
def main(args: Array[String]): Unit = {

//ENVIRONMENT, CONFIG, RABBIT SOURCE, CREATION OF DATASTREAM
val conf = new Configuration()
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.enableCheckpointing(100)

val connectionConfig: RMQConnectionConfig = new RMQConnectionConfig.Builder
()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("xxxx")
.setUserName("xxxx")
.setPassword("xxxx")
.build

type eventType = HourlyReading
//Not gonna include the DeserializationSchema here, it works properly
'cause I can see it printed.
val deserializationSchema = new GenericJsonDeserializationSchema[eventType]

val rabbitMQSource = new RabbitMQSource[eventType](connectionConfig,
"tests-flink", deserializationSchema)

val serializableTimestampAssigner = new
SerializableTimestampAssigner[eventType]
{
def extractTimestamp(element: eventType, recordTimestamp: Long): Long =
element.obtainTimestamp.toInstant(ZoneId.systemDefault.getRules.getOffset(
Instant.now())).toEpochMilli
}

val inputStream: DataStream[eventType] = env
.addSource(rabbitMQSource.obtainRabbitSource) // same as above, the
RMQSource works well, not added here.
.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(serializableTimestampAssigner)
}


inputStream.print() // printed ok

// PATTERN DEFINITION AND

val pattern =
Pattern
.begin[HourlyReading]("a1-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.next("a2-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.where { (event: HourlyReading, context: Context[HourlyReading]) =>
val previousPattern = context.getEventsForPattern("a1-nighttimeflowfraud"
).toList.head
(previousPattern.serialNumber == event.serialNumber) &&
(previousPattern.unit == "M3" && event.unit == "M3") && (event.volume -
previousPattern.volume < 2 ||
event.volume - previousPattern.volume > 3)
}
val patternStream = CEP.pattern(inputStream, pattern)

val outputStream: DataStream[OutputEvent] = patternStream.process(new
HourlyReadingProcessFunction) //below

outputStream.print() // nothing here

env.execute
}
}


//////////////// PATTERN PROCESS FUNCTION CLASS
///////////////////////////////

class HourlyReadingProcessFunction extends PatternProcessFunction[
HourlyReading, OutputEvent] {
def processMatch(
patterns: util.Map[String, util.List[HourlyReading]],
ctx: PatternProcessFunction.Context,
out: Collector[OutputEvent]
): Unit = {

System.out.println("PROCESSING MATCH")

val pattern1: HourlyReading = patterns.get("a1-nighttimeflowfraud").get(0)
val pattern2: HourlyReading = patterns.get("a2-nighttimeflowfraud").get(0)

System.out.println(pattern1)
System.out.println(pattern2)
//case class OutputEvent(str: String)
out.collect(OutputEvent(s"Event observed: ${pattern1.dateTime.toString} - ${
pattern2.dateTime.toString}"))
}
}





Thanks a lot in advance!!


*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>

Reply via email to