Regarding 2) I don't think so. That would require access to the datastax MappingManager.
We could add something similar as the ClusterBuilder for that though.

Regards,
Chesnay

On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
Hi Till,

Thanks for the information.

1. What do you mean by 'subtask', is it every partition or every message in the stream?

2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL as I can't use a query when I have a datastream with Pojo?

CassandraSink.addSink(messageStream)
          .setClusterBuilder(new ClusterBuilder() {
              @Override protected Cluster buildCluster(Cluster.Builder builder) 
{
                  return buildCassandraCluster();
              }
          })
          .build();
Thanks,

On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrm...@apache.org <mailto:trohrm...@apache.org>> wrote:

    Hi Meghashyam,

    1.

        You can perform initializations in the open method of the
        |RichSinkFunction| interface. The |open| method will be called
        once for every sub task when initializing it. If you want to
        share the resource across multiple sub tasks running in the
        same JVM you can also store the |dbSession| in a class variable.

    2.

        The Flink community is currently working on adding security
        support including ssl encryption to Flink. So maybe in the
        future you can use Flink’s Cassandra sink again.

    Cheers,
    Till

    ​

    On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
    <vr1meghash...@gmail.com <mailto:vr1meghash...@gmail.com>> wrote:

        Thanks a lot for the quick reply Shannon.

        1. I will create a class that extends SinkFunction and write
        my connection logic there. My only question here is- will a
        dbSession be created for each message/partition which might
        affect the performance? Thats the reason why I added this line
        to create a connection once and use it along the datastream.
        if(dbSession == null && store!=null) { dbSession = getSession();}
        2. I couldn't use flink-connector-cassandra as I have SSL
        enabled for my C* cluster and I couldn't get it work with all
        my SSL config(truststore,keystore etc) added to cluster
        building. I didn't find a proper example with SSL enabled
        flink-connector-cassandra


        Thanks




        On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
        <sca...@expedia.com <mailto:sca...@expedia.com>> wrote:

            You haven't really added a sink in Flink terminology,
            you're just performing a side effect within a map
            operator. So while it may work, if you want to add a sink
            proper you need have an object that extends SinkFunction
            or RichSinkFunction. The method call on the stream should
            be ".addSink(…)".

            Also, the dbSession isn't really Flink state as it will
            not vary based on the position in or content in the
            stream. It's a necessary helper object, yes, but you don't
            need Flink to checkpoint it.

            You can still use the sinks provided with
            flink-connector-cassandra and customize the cluster
            building by passing your own ClusterBuilder into the
            constructor.

            -Shannon

            From: Meghashyam Sandeep V <vr1meghash...@gmail.com
            <mailto:vr1meghash...@gmail.com>>
            Date: Friday, December 9, 2016 at 12:26 PM
            To: <user@flink.apache.org
            <mailto:user@flink.apache.org>>, <d...@flink.apache.org
            <mailto:d...@flink.apache.org>>
            Subject: Reg. custom sinks in Flink

            Hi there,

            I have a flink streaming app where my source is Kafka and
            a custom sink to Cassandra(I can't use standard C* sink
            that comes with flink as I have customized auth to C*).
            I'm currently have the following:

            messageStream
                     .rebalance()

                     .map( s-> {

                 returnmapper.readValue(s, JsonNode.class);)

                     .filter(//filter some messages)

                     .map(

                      (MapFunction<JsonNode, String>) message -> {

                      getDbSession.execute("QUERY_TO_EXEC")

                      })

            private static Session getDbSession() {
                 if(dbSession ==null &&store!=null) {
                     dbSession = getSession();
                 }

                 return dbSession;
            }

            1. Is this the right way to add a custom sink? As you can see, I 
have dbSession as a class variable here and I'm storing its state.

            2. This setup works fine in a standalone flink (java -jar 
MyJar.jar). When I run using flink with YARN on EMR I get a NPE at the session 
which is kind of weird.

            Thanks

Reply via email to