Sure, I will create a Jira for that. In addition to that, I would like to confirm, would it be possible to reuse the connection builder object across queries and across jobs. i.e if I create a Singleton class which would create a connection builder instance and could I use across the queries.
I have attempted that b/n a steaming api and a batch api but would like to confirm the same. Please check the following piece of code and let me know your input. Please find the attached files. Regards, Jagadisha G On Tue, Sep 26, 2017 at 5:41 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Jagadish, > > Yes, that indeed is something missing. If that is something you’re > interested in, could you perhaps open a JIRA for that (AFAIK there isn’t > one for the feature yet). > > Gordon > > > On 26 September 2017 at 2:09:37 PM, Jagadish Gangulli (jagadi...@gmail.com) > wrote: > > Thanks Gordon, > > Have few more queries on the same lines, if I have to perform fetch i.e. > select queries, I have to go for the batch queries, no streaming support is > available. > > Regards, > Jagadisha G > > On Tue, Sep 26, 2017 at 3:40 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Jagadish, >> >> Yes, you are right that the Flink Cassandra connector uses the Datastax >> drivers internally, which is also the case for all the other Flink >> connectors; e.g., the Kafka connector uses the Kafka Java client, >> Elasticearch connector uses the ES Java client, etc. >> >> The main advantage when using these Flink first-class supported >> connectors is basically the following: >> - Most importantly, the connectors work with Flink’s checkpointing >> mechanism to achieve exactly-once or at-least-once guarantees. You can read >> more about that here [1]. >> - The connectors are built on Flink’s abstractions of streaming sources / >> sinks. What this means is you can basically swap out / plug-in / add >> sources or sinks to various external systems without altering the main >> business logic in your processing pipeline. i.e., also sinking your data to >> Elasticsearch would be as simple as also adding a Elasticsearch sink to >> your pipeline output alongside your Cassandra sink. >> >> Hope this clarifies some points for you! >> >> Cheers, >> Gordon >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release- >> 1.3/internals/stream_checkpointing.html >> >> On 26 September 2017 at 11:03:16 AM, Jagadish Gangulli ( >> jagadi...@gmail.com) wrote: >> >> Hi, >> >> I have been recently into the application development with flink. We are >> trying to use the flink-apache connectors to get the data in and out from >> Cassandra. >> >> We attempted both Datastax drivers and Flink-cassandra connectors. In >> this process felt that flink-cassandra connector is more of a wrapper on >> top of data stax cassandra drivers. >> >> Hence could some one please explain the benefits of the >> flink-cassandra-connectors over the data stax driver apis. We are looking >> for the APIs which are better in terms of performance. Please let me know >> your thoughts. >> >> Thanks & Regards, >> Jagadisha G >> >> >
package flinkConnector; import java.util.ArrayList; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster.Builder; //import com.datastax.driver.core.utils.UUIDs; public class FlinkCassandraConnector { private static final ArrayList<String> messages = new ArrayList<>(20); static { for (long i = 180; i <= 190; i++) { messages.add("cassandra-" + i); } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple1<String>> stringStream = env.fromCollection(messages).map(new MapFunction<String, Tuple1<String>>(){ public Tuple1<String> map(String value) throws Exception { return Tuple1.of(value ); } }); try { ClusterBuilder clusterObj = ClusterBuilderProvider.getClusterBuilder(); CassandraSink.addSink(stringStream) .setQuery("INSERT INTO test.message (body) values (?);").setClusterBuilder(clusterObj).build(); String SELECT_QUERY = "SELECT body FROM test.message;"; DataStream<Tuple1<String>> inputDS = env .createInput(new CassandraInputFormat<Tuple1<String>>(SELECT_QUERY, clusterObj), TupleTypeInfo.of(new TypeHint<Tuple1< String>>() { })); inputDS.print(); env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
package flinkConnector; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster.Builder; public class ClusterBuilderProvider { static ClusterBuilder clusterBuilder = new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("192.168.15.40").build(); } }; static ClusterBuilder getClusterBuilder(){ if( clusterBuilder != null) return clusterBuilder ; else return new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("192.168.15.40").build(); } }; } }