[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4696


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141785837
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
--- End diff --

It is true that we don't need to get to that level of details. I could 
rephrase to what you suggested. 


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141615691
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141611782
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
--- End diff --

sets -> Sets


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141613315
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,77 +96,189 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
+
+Note: 
However, current Cassandra Sink implementation does not flush the pending 
mutations before the checkpoint was triggered. Thus, some in-flight mutations 
might not be replayed when the job recovered. 
+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
 
- Example
+## Examples
+
+The Cassandra sinks currently support both Tuple and POJO data types, and 
Flink automatically detects which type of input is used. For general use case 
of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Tuple Data Type
+While storing the result with Java/Scala Tuple data type to a Cassandra 
sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to 
persist each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
--- End diff --

-> back to **the** database.

I would reword the second part of second sentence to something like ", each 
Tuple element is converted to parameters of the statement."


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141612269
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
+* __DO__ set the upsert query for processing __Tuple__ data type
+* __DO NOT__ set the query for processing __POJO__ data type.
+2. _setClusterBuilder()_
+* sets the cluster builder that is used to configure the connection to 
cassandra with more sophisticated settings such as consistency level, retry 
policy and etc.
+3. _setHost(String host[, int port])_
+* simple version of setClusterBuilder() with host/port information to 
connect to Cassandra instances
+4. _enableWriteAheadLog([CheckpointCommitter committer])_
+* an __optional__ setting
+* allows exactly-once processing for non-deterministic algorithms.
+5. _build()_
+* finalizes the configuration and constructs the CassandraSink 
instance.
--- End diff --

finalizes -> Finalizes


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141611926
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
+* __DO__ set the upsert query for processing __Tuple__ data type
--- End diff --

missing period.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141611417
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
--- End diff --

sink -> sinks; remove by (while we're at it)


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141611240
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
--- End diff --

This line is a bit weird, i Would just remove it.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141612087
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
+* __DO__ set the upsert query for processing __Tuple__ data type
+* __DO NOT__ set the query for processing __POJO__ data type.
--- End diff --

type -> types


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141614919
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,77 +96,189 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
+
+Note: 
However, current Cassandra Sink implementation does not flush the pending 
mutations before the checkpoint was triggered. Thus, some in-flight mutations 
might not be replayed when the job recovered. 
+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
 
- Example
+## Examples
+
+The Cassandra sinks currently support both Tuple and POJO data types, and 
Flink automatically detects which type of input is used. For general use case 
of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Tuple Data Type
+While storing the result with Java/Scala Tuple data type to a Cassandra 
sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to 
persist each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\s");
+
+// emit the pairs
+for (String word : words) {
+//Do not accept empty word, since word is defined as 
primary key in C* table
+if (!word.isEmpty()) {
+out.collect(new Tuple2(word, 1L));
+}
+}
+}
+})
+.keyBy(0)
+.timeWindow(Time.seconds(5))
+.sum(1)
+;
+
+CassandraSink.addSink(result)
+.setQuery("INSERT INTO example.wordcount(word, count) values (?, 
?);")
+.setHost("127.0.0.1")
+.build();
 {% endhighlight %}
 
+
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
+val env: StreamExecutionEnvironment = 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141611894
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
--- End diff --

-> The query is internally treated as a CQL ...


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141614161
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,77 +96,189 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
+
+Note: 
However, current Cassandra Sink implementation does not flush the pending 
mutations before the checkpoint was triggered. Thus, some in-flight mutations 
might not be replayed when the job recovered. 
+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
 
- Example
+## Examples
+
+The Cassandra sinks currently support both Tuple and POJO data types, and 
Flink automatically detects which type of input is used. For general use case 
of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Tuple Data Type
+While storing the result with Java/Scala Tuple data type to a Cassandra 
sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to 
persist each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\s");
+
+// emit the pairs
+for (String word : words) {
+//Do not accept empty word, since word is defined as 
primary key in C* table
+if (!word.isEmpty()) {
+out.collect(new Tuple2(word, 1L));
+}
+}
+}
+})
+.keyBy(0)
+.timeWindow(Time.seconds(5))
+.sum(1)
+;
--- End diff --

move to previous line


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r141612126
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -35,37 +43,47 @@ To use this connector, add the following dependency to 
your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+Note that the streaming connectors are currently __NOT__ part of the 
binary distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/linking.html).
+
+## Installing Apache Cassandra
+There are multiple ways to bring up a Cassandra instance on local machine:
+
+1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
+2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
- Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+## Cassandra Sink
+Flink Cassandra connector currently supports Apache Cassandra as a data 
sink.
 
- Cassandra Sink
+### Configurations
 
 Flink's Cassandra sink are created by using the static 
CassandraSink.addSink(DataStream input) method.
-This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink.
+This method returns a CassandraSinkBuilder, which offers methods to 
further configure the sink, and finally `build()` the sink instance.
 
 The following configuration methods can be used:
 
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
+1. _setQuery(String query)_
+* sets the upsert query that is executed for every record the sink 
receives.
+* internally treated as CQL prepared statement, in which parameters 
could be shared or anonymous.
+* __DO__ set the upsert query for processing __Tuple__ data type
+* __DO NOT__ set the query for processing __POJO__ data type.
+2. _setClusterBuilder()_
+* sets the cluster builder that is used to configure the connection to 
cassandra with more sophisticated settings such as consistency level, retry 
policy and etc.
--- End diff --

sets -> Sets


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620313
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620261
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

Very true. Thanks.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620296
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620211
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620229
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

Good eye. Thanks


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140518123
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517823
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517182
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
--- End diff --

I would leave this out. Its one of those things that are easily out-dated, 
and don't provide immediate value when reading the docs for the first time. If 
a user stumbles upon this the error message should be self-explanatory; if it 
isn't we should change it accordingly.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516418
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

imo this is a bit redundant, we are already linking to the checkpoint docs 
after all.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517696
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516135
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

theres a double space after mutations


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517986
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-21 Thread mcfongtw
GitHub user mcfongtw opened a pull request:

https://github.com/apache/flink/pull/4696

[FLINK-7632] [document] Overhaul on Cassandra connector doc

## What is the purpose of the change
Refactor Cassandra connector documentation by providing 
- in-depth information about Cassandra Pojo sink and Cassandra Tuple sink.
- meaningful examples to show distinction between streaming over Java Tuple 
and Pojo data types.

## Brief change log
- Refactor the current usage of on Cassandra Sink w/ more in-depth 
information.
- Provides examples for Pojo and Java Tuple data types


## Verifying this change
Have tested w/ local doc server (w/ docker setup)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive):no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation
  - Does this pull request introduce a new feature? no



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mcfongtw/flink FLINK-7632

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4696






---