You haven't really added a sink in Flink terminology, you're just performing a side effect within a map operator. So while it may work, if you want to add a sink proper you need have an object that extends SinkFunction or RichSinkFunction. The method call on the stream should be ".addSink(…)".
Also, the dbSession isn't really Flink state as it will not vary based on the position in or content in the stream. It's a necessary helper object, yes, but you don't need Flink to checkpoint it. You can still use the sinks provided with flink-connector-cassandra and customize the cluster building by passing your own ClusterBuilder into the constructor. -Shannon From: Meghashyam Sandeep V <vr1meghash...@gmail.com<mailto:vr1meghash...@gmail.com>> Date: Friday, December 9, 2016 at 12:26 PM To: <user@flink.apache.org<mailto:user@flink.apache.org>>, <d...@flink.apache.org<mailto:d...@flink.apache.org>> Subject: Reg. custom sinks in Flink Hi there, I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't use standard C* sink that comes with flink as I have customized auth to C*). I'm currently have the following: messageStream .rebalance() .map( s-> { return mapper.readValue(s, JsonNode.class);) .filter(//filter some messages) .map( (MapFunction<JsonNode, String>) message -> { getDbSession.execute("QUERY_TO_EXEC") }) private static Session getDbSession() { if(dbSession == null && store!=null) { dbSession = getSession(); } return dbSession; } 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class variable here and I'm storing its state. 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird. Thanks