Sure, I will create a Jira for that.

In addition to that, I would like to confirm, would it be possible to reuse
the connection builder object across queries and across jobs. i.e if I
create a Singleton class which would create a connection builder instance
and could I use across the queries.

I have attempted that b/n a steaming api and a batch api but would like to
confirm the same. Please check the following piece of code and let me know
your input. Please find the attached files.


Regards,
Jagadisha G





On Tue, Sep 26, 2017 at 5:41 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Jagadish,
>
> Yes, that indeed is something missing. If that is something you’re
> interested in, could you perhaps open a JIRA for that (AFAIK there isn’t
> one for the feature yet).
>
> Gordon
>
>
> On 26 September 2017 at 2:09:37 PM, Jagadish Gangulli (jagadi...@gmail.com)
> wrote:
>
> Thanks Gordon,
>
> Have few more queries on the same lines, if I have to perform fetch i.e.
> select queries, I have to go for the batch queries, no streaming support is
> available.
>
> Regards,
> Jagadisha G
>
> On Tue, Sep 26, 2017 at 3:40 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Jagadish,
>>
>> Yes, you are right that the Flink Cassandra connector uses the Datastax
>> drivers internally, which is also the case for all the other Flink
>> connectors; e.g., the Kafka connector uses the Kafka Java client,
>> Elasticearch connector uses the ES Java client, etc.
>>
>> The main advantage when using these Flink first-class supported
>> connectors is basically the following:
>> - Most importantly, the connectors work with Flink’s checkpointing
>> mechanism to achieve exactly-once or at-least-once guarantees. You can read
>> more about that here [1].
>> - The connectors are built on Flink’s abstractions of streaming sources /
>> sinks. What this means is you can basically swap out / plug-in / add
>> sources or sinks to various external systems without altering the main
>> business logic in your processing pipeline. i.e., also sinking your data to
>> Elasticsearch would be as simple as also adding a Elasticsearch sink to
>> your pipeline output alongside your Cassandra sink.
>>
>> Hope this clarifies some points for you!
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/internals/stream_checkpointing.html
>>
>> On 26 September 2017 at 11:03:16 AM, Jagadish Gangulli (
>> jagadi...@gmail.com) wrote:
>>
>> Hi,
>>
>> I have been recently into the application development with flink. We are
>> trying to use the flink-apache connectors to get the data in and out from
>> Cassandra.
>>
>> We attempted both Datastax drivers and Flink-cassandra connectors.  In
>> this process felt that flink-cassandra connector is more of a wrapper on
>> top of data stax cassandra drivers.
>>
>> Hence could some one please explain the benefits of the
>> flink-cassandra-connectors over the data stax driver apis. We are looking
>> for the APIs which are better in terms of performance. Please let me know
>> your thoughts.
>>
>> Thanks & Regards,
>> Jagadisha G
>>
>>
>
package flinkConnector;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
//import com.datastax.driver.core.utils.UUIDs;


public class FlinkCassandraConnector {

	
	private static final ArrayList<String> messages = new ArrayList<>(20);

	static {
		for (long i = 180; i <= 190; i++) {
			messages.add("cassandra-" + i);
		}
		
	}
	
	
	public static void main(String[] args) {
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		DataStream<Tuple1<String>> stringStream = env.fromCollection(messages).map(new MapFunction<String, Tuple1<String>>(){
			
			public Tuple1<String> map(String value) throws Exception {
				return Tuple1.of(value );
				
		    }
		});
				
		try {			
				ClusterBuilder clusterObj = ClusterBuilderProvider.getClusterBuilder();
		
				CassandraSink.addSink(stringStream)
					.setQuery("INSERT INTO test.message (body) values (?);").setClusterBuilder(clusterObj).build();
			
				String SELECT_QUERY = "SELECT body FROM test.message;";
				
				DataStream<Tuple1<String>> inputDS = env
					.createInput(new CassandraInputFormat<Tuple1<String>>(SELECT_QUERY, clusterObj), TupleTypeInfo.of(new TypeHint<Tuple1< String>>() {
						}));

			inputDS.print();
			env.execute();
			
		} catch (Exception e) {
			e.printStackTrace();
		} 

	
	}

}
package flinkConnector;

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;

public class ClusterBuilderProvider {
	
	static ClusterBuilder clusterBuilder =  new ClusterBuilder() {
		
		@Override
		protected Cluster buildCluster(Builder builder) {
			
			return builder.addContactPoint("192.168.15.40").build();
		}
	};
	
	
	static ClusterBuilder getClusterBuilder(){
		
			if( clusterBuilder != null)
				return clusterBuilder ;	
			else
				return new ClusterBuilder() {
		
			@Override
			protected Cluster buildCluster(Builder builder) {
				
				return builder.addContactPoint("192.168.15.40").build();
			}
		
		};
	}
			

}

Reply via email to