In my Flink program, after a couple of map, union and connect, I have a
final filter and a sink. Something like this (after abstracting out

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking

  *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
  .addSink(new NotificationSinkFunction(notifier))

The check function returns a Boolean and does not access anything other
than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
      extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
    LOG.info("Log this notification detail")
    *notifier.send(**event.f1, event.f2) //BUG*

If I comment out the lines highlighted and marked with //BUG, the Flink
pipeline works and print the log messages, and Flink shows this execution
plan at the end:

filtered_users -> Sink: send_notification

[image: Inline image 1]

But with either of those two lines marked as BUG above, Flink makes and
executes plan only till filtered_user and does not print the log message.

[image: Inline image 2]

How can I figure out what is wrong with the check function or notifier send
function that prevents Flink from making the full plan. What are the
typical mistakes leading to this?


Reply via email to