[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5272 ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168343968 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException { throw new IOException("write record failed", exception); } - Object[] fields = new Object[record.getArity()]; - for (int i = 0; i < record.getArity(); i++) { - fields[i] = record.getField(i); - } + Object[] fields = extractFields(record); ResultSetFuture result = session.executeAsync(prepared.bind(fields)); Futures.addCallback(result, callback); } - /** + protected abstract Object[] extractFields(OUT record); --- End diff -- That's a good point! I agree, let's keep it as it is. ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168338536 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException { throw new IOException("write record failed", exception); } - Object[] fields = new Object[record.getArity()]; - for (int i = 0; i < record.getArity(); i++) { - fields[i] = record.getField(i); - } + Object[] fields = extractFields(record); ResultSetFuture result = session.executeAsync(prepared.bind(fields)); Futures.addCallback(result, callback); } - /** + protected abstract Object[] extractFields(OUT record); --- End diff -- I am hesitated to make this change. There is no documentation in Cassandra code on how PreparedStatement.bind will do with the input fields. Although the actual code will serialize the fields values, and wont keep a reference, so it's Ok to reuse the fields array across invocation even if executeAsync is used. I would be still hesitated to do so because it's not stated in the Cassandra client document, so it might subject to future changes. I agree performance is a concern here, one way to improve it is to add a getFields() method to Row, so we can reuse it for CassandraRowOutputFormat instead of create a new Object[]. ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168330940 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +/** + * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster. + * + * @param Type of elements to write, it must extend {@link Tuple}. + */ +public class CassandraTupleOutputFormat extends CassandraOutputFormat { --- End diff -- done ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168331011 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -37,11 +36,11 @@ import java.io.IOException; /** - * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. + * CassandraOutputFormat is the common abstract class for writing into Apache Cassandra. * - * @param type of Tuple + * @param Type of the elements to write. */ -public class CassandraOutputFormat extends RichOutputFormat { +public abstract class CassandraOutputFormat extends RichOutputFormat { --- End diff -- done ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168330917 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.types.Row; + +/** + * OutputFormat to write Flink {@link Row}s into a Cassandra cluster. + * + * @param Type of elements to write, it must extend {@link Row}. + */ +public class CassandraRowOutputFormat extends CassandraOutputFormat { --- End diff -- done ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168330890 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -457,8 +458,8 @@ public void testCassandraTableSink() throws Exception { } @Test - public void testCassandraBatchFormats() throws Exception { - OutputFormat> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + public void testCassandraBatchTupleFormats() throws Exception { --- End diff -- done ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168330869 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -482,6 +483,23 @@ public void testCassandraBatchFormats() throws Exception { Assert.assertEquals(20, result.size()); } + @Test + public void testCassandraBatchRowFormats() throws Exception { --- End diff -- done ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168176395 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.types.Row; + +/** + * OutputFormat to write Flink {@link Row}s into a Cassandra cluster. + * + * @param Type of elements to write, it must extend {@link Row}. + */ +public class CassandraRowOutputFormat extends CassandraOutputFormat { --- End diff -- I don't think we have to handle subclasses of `Row`. Change `` to ``? ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168169778 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +/** + * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster. + * + * @param Type of elements to write, it must extend {@link Tuple}. + */ +public class CassandraTupleOutputFormat extends CassandraOutputFormat { --- End diff -- Create another class `CassandraOutputFormat` that extends this class and doesn't override anything. This class is for backwards compatibility and should be deprecated. ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168177375 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -482,6 +483,23 @@ public void testCassandraBatchFormats() throws Exception { Assert.assertEquals(20, result.size()); } + @Test + public void testCassandraBatchRowFormats() throws Exception { --- End diff -- rename to `testCassandraBatchRowFormat` (-s) ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168177338 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -457,8 +458,8 @@ public void testCassandraTableSink() throws Exception { } @Test - public void testCassandraBatchFormats() throws Exception { - OutputFormat> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + public void testCassandraBatchTupleFormats() throws Exception { --- End diff -- rename to `testCassandraBatchTupleFormat` (-s) ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168169354 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -37,11 +36,11 @@ import java.io.IOException; /** - * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. + * CassandraOutputFormat is the common abstract class for writing into Apache Cassandra. * - * @param type of Tuple + * @param Type of the elements to write. */ -public class CassandraOutputFormat extends RichOutputFormat { +public abstract class CassandraOutputFormat extends RichOutputFormat { --- End diff -- Rename to `CassandraOutputFormatBase` ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5272#discussion_r168175822 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException { throw new IOException("write record failed", exception); } - Object[] fields = new Object[record.getArity()]; - for (int i = 0; i < record.getArity(); i++) { - fields[i] = record.getField(i); - } + Object[] fields = extractFields(record); ResultSetFuture result = session.executeAsync(prepared.bind(fields)); Futures.addCallback(result, callback); } - /** + protected abstract Object[] extractFields(OUT record); --- End diff -- Add an `Object[] fields` parameter that can be reused across invocations of `extractFields()`. ---
[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...
GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/5272 [Flink-8397][Connectors]Support Row type for Cassandra OutputFormat ## Brief change log - Add CassandraOutputFormatBase - Add CassandraRowOutputFormat - Add CassandraTupleOutputFormat ## Verifying this change This change added tests and can be verified by CassandraConnectorITCase.{testCassandraBatchTupleFormats, testCassandraBatchRowFormats} ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: ( / ) - The runtime per-record code paths (performance sensitive): (no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no ## Documentation - Does this pull request introduce a new feature? (yes - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8397 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5272 commit 9a42ee7e18d043b06c4d28a6dc54c0ace7fae6c2 Author: Shuyi ChenDate: 2018-01-10T01:40:41Z Support Row type for Cassandra OutputFormat ---