You haven't really added a sink in Flink terminology, you're just performing a 
side effect within a map operator. So while it may work, if you want to add a 
sink proper you need have an object that extends SinkFunction or 
RichSinkFunction. The method call on the stream should be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the 
position in or content in the stream. It's a necessary helper object, yes, but 
you don't need Flink to checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and 
customize the cluster building by passing your own ClusterBuilder into the 
constructor.

-Shannon

From: Meghashyam Sandeep V 
<vr1meghash...@gmail.com<mailto:vr1meghash...@gmail.com>>
Date: Friday, December 9, 2016 at 12:26 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>, 
<d...@flink.apache.org<mailto:d...@flink.apache.org>>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to 
Cassandra(I can't use standard C* sink that comes with flink as I have 
customized auth to C*). I'm currently have the following:


messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession 
as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I 
run using flink with YARN on EMR I get a NPE at the session which is kind of 
weird.


Thanks

Reply via email to