[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5272


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168343968
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException {
throw new IOException("write record failed", exception);
}
 
-   Object[] fields = new Object[record.getArity()];
-   for (int i = 0; i < record.getArity(); i++) {
-   fields[i] = record.getField(i);
-   }
+   Object[] fields = extractFields(record);
ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
Futures.addCallback(result, callback);
}
 
-   /**
+   protected abstract Object[] extractFields(OUT record);
--- End diff --

That's a good point! I agree, let's keep it as it is.


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168338536
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException {
throw new IOException("write record failed", exception);
}
 
-   Object[] fields = new Object[record.getArity()];
-   for (int i = 0; i < record.getArity(); i++) {
-   fields[i] = record.getField(i);
-   }
+   Object[] fields = extractFields(record);
ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
Futures.addCallback(result, callback);
}
 
-   /**
+   protected abstract Object[] extractFields(OUT record);
--- End diff --

I am hesitated to make this change. There is no documentation in Cassandra 
code on how PreparedStatement.bind will do with the input fields. Although the 
actual code will serialize the fields values, and wont keep a reference, so 
it's Ok to reuse the fields array across invocation even if executeAsync is 
used. I would be still hesitated to do so because it's not stated in the 
Cassandra client document, so it might subject to future changes. I agree 
performance is a concern here, one way to improve it is to add a getFields() 
method to Row, so we can reuse it for CassandraRowOutputFormat instead of 
create a new Object[].


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168330940
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+/**
+ * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Tuple}.
+ */
+public class CassandraTupleOutputFormat extends 
CassandraOutputFormat {
--- End diff --

done


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168331011
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -37,11 +36,11 @@
 import java.io.IOException;
 
 /**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} 
into Apache Cassandra.
+ * CassandraOutputFormat is the common abstract class for writing into 
Apache Cassandra.
  *
- * @param  type of Tuple
+ * @param  Type of the elements to write.
  */
-public class CassandraOutputFormat extends 
RichOutputFormat {
+public abstract class CassandraOutputFormat extends 
RichOutputFormat {
--- End diff --

done


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168330917
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.types.Row;
+
+/**
+ * OutputFormat to write Flink {@link Row}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Row}.
+ */
+public class CassandraRowOutputFormat extends 
CassandraOutputFormat {
--- End diff --

done


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168330890
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -457,8 +458,8 @@ public void testCassandraTableSink() throws Exception {
}
 
@Test
-   public void testCassandraBatchFormats() throws Exception {
-   OutputFormat> sink = new 
CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   public void testCassandraBatchTupleFormats() throws Exception {
--- End diff --

done


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168330869
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -482,6 +483,23 @@ public void testCassandraBatchFormats() throws 
Exception {
Assert.assertEquals(20, result.size());
}
 
+   @Test
+   public void testCassandraBatchRowFormats() throws Exception {
--- End diff --

done


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168176395
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.types.Row;
+
+/**
+ * OutputFormat to write Flink {@link Row}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Row}.
+ */
+public class CassandraRowOutputFormat extends 
CassandraOutputFormat {
--- End diff --

I don't think we have to handle subclasses of `Row`. Change `` to ``?


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168169778
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+/**
+ * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Tuple}.
+ */
+public class CassandraTupleOutputFormat extends 
CassandraOutputFormat {
--- End diff --

Create another class `CassandraOutputFormat` that extends this class and 
doesn't override anything.
This class is for backwards compatibility and should be deprecated.



---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168177375
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -482,6 +483,23 @@ public void testCassandraBatchFormats() throws 
Exception {
Assert.assertEquals(20, result.size());
}
 
+   @Test
+   public void testCassandraBatchRowFormats() throws Exception {
--- End diff --

rename to `testCassandraBatchRowFormat` (-s)


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168177338
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -457,8 +458,8 @@ public void testCassandraTableSink() throws Exception {
}
 
@Test
-   public void testCassandraBatchFormats() throws Exception {
-   OutputFormat> sink = new 
CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   public void testCassandraBatchTupleFormats() throws Exception {
--- End diff --

rename to `testCassandraBatchTupleFormat` (-s)


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168169354
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -37,11 +36,11 @@
 import java.io.IOException;
 
 /**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} 
into Apache Cassandra.
+ * CassandraOutputFormat is the common abstract class for writing into 
Apache Cassandra.
  *
- * @param  type of Tuple
+ * @param  Type of the elements to write.
  */
-public class CassandraOutputFormat extends 
RichOutputFormat {
+public abstract class CassandraOutputFormat extends 
RichOutputFormat {
--- End diff --

Rename to `CassandraOutputFormatBase`


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168175822
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException {
throw new IOException("write record failed", exception);
}
 
-   Object[] fields = new Object[record.getArity()];
-   for (int i = 0; i < record.getArity(); i++) {
-   fields[i] = record.getField(i);
-   }
+   Object[] fields = extractFields(record);
ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
Futures.addCallback(result, callback);
}
 
-   /**
+   protected abstract Object[] extractFields(OUT record);
--- End diff --

Add an `Object[] fields` parameter that can be reused across invocations of 
`extractFields()`.


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-01-10 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5272

[Flink-8397][Connectors]Support Row type for Cassandra OutputFormat


## Brief change log
  - Add CassandraOutputFormatBase
  - Add CassandraRowOutputFormat
  - Add CassandraTupleOutputFormat


## Verifying this change

This change added tests and can be verified by 
CassandraConnectorITCase.{testCassandraBatchTupleFormats, 
testCassandraBatchRowFormats}

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: ( / )
  - The runtime per-record code paths (performance sensitive): (no 
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no 

## Documentation

  - Does this pull request introduce a new feature? (yes 
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8397

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5272


commit 9a42ee7e18d043b06c4d28a6dc54c0ace7fae6c2
Author: Shuyi Chen 
Date:   2018-01-10T01:40:41Z

Support Row type for Cassandra OutputFormat




---