[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025213#comment-16025213 ] ASF GitHub Bot commented on FLINK-4497: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2633 > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992687#comment-15992687 ] ASF GitHub Bot commented on FLINK-4497: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2633#discussion_r114289764 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -389,4 +403,54 @@ private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer se this.tableName = tableName; } } + + @Test + public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception { + Class> c = (Class>) new Tuple1<>("hello").getClass(); + Seq> typeInfos = JavaConverters.asScalaBufferConverter( + Arrays.>asList(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).asScala(); + Seq fieldNames = JavaConverters.asScalaBufferConverter( + Arrays.asList(new String[]{"_1"})).asScala(); + + CaseClassTypeInfo> typeInfo = new CaseClassTypeInfo>(c, null, typeInfos, fieldNames) { + @Override + public TypeSerializer> createSerializer(ExecutionConfig config) { + return null; + } + }; + + StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); --- End diff -- I'll fix this and rebase the PR later today. > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992670#comment-15992670 ] ASF GitHub Bot commented on FLINK-4497: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2633#discussion_r114286727 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -389,4 +403,54 @@ private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer se this.tableName = tableName; } } + + @Test + public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception { + Class> c = (Class>) new Tuple1<>("hello").getClass(); + Seq> typeInfos = JavaConverters.asScalaBufferConverter( + Arrays.>asList(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).asScala(); + Seq fieldNames = JavaConverters.asScalaBufferConverter( + Arrays.asList(new String[]{"_1"})).asScala(); + + CaseClassTypeInfo> typeInfo = new CaseClassTypeInfo>(c, null, typeInfos, fieldNames) { + @Override + public TypeSerializer> createSerializer(ExecutionConfig config) { + return null; + } + }; + + StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); --- End diff -- I think there is no need to mock this. You can use a regular execution environment. Unless you call `execute()`, no mini cluster would be spawned anyways. > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991423#comment-15991423 ] ASF GitHub Bot commented on FLINK-4497: --- Github user StephenWithPH commented on the issue: https://github.com/apache/flink/pull/2633 @zentol & @StephanEwen ... anything I can do to get this ready to merge? > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628645#comment-15628645 ] ASF GitHub Bot commented on FLINK-4497: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2633 @StephanEwen I integrated all of your suggestions. > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15608662#comment-15608662 ] ASF GitHub Bot commented on FLINK-4497: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2633 Look pretty good. I think it would be good to try to make the Cassandra Module to the user Scala independent. That way, we don't introduce a hard requirement for Scala versioning and make this Scala version independent as soon as Flink's runtime is Scala Version independent. We can do that the following way: - Set the Scala dependency to "provided". Flink pulls Scala anyways, Scala users have Scala anyways always as a dependency. Users that use Scala from Java can always add an additional Scala dependency in their project. - Add the Scala library as a test dependency - Drop the Scala compiler plugin - Use a case class for the tests that comes in the Scala library (such as Tuple2) - Migrate the test to Java - the sink class is in Java anyways What do you think about that? > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575736#comment-15575736 ] ASF GitHub Bot commented on FLINK-4497: --- Github user eliaslevy commented on the issue: https://github.com/apache/flink/pull/2633 👍 > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575145#comment-15575145 ] ASF GitHub Bot commented on FLINK-4497: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2633 The important class is `CassandraScalaProductSink` which extracts the fields from the scala classes. Apart from that it behaves exactly like the `CassandraTupleSink`. As a result of that this common behavior was refactored into a common abstract class. The `CassandraSinkBuilder` was slightly refactored to fail on unsupported types and not simply assume POJO's. In addition, the `sanityCheck()` method is now guaranteed to be called. > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
[ https://issues.apache.org/jira/browse/FLINK-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575138#comment-15575138 ] ASF GitHub Bot commented on FLINK-4497: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2633 [FLINK-4497] CassadraSink support for scala tuples/case classes This PR adds support for scala tuples/case classes to the streaming at-least-once-sink. If this pattern works we can extend it to other sinks as well in a follow-up. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 4497_cass_scala Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2633.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2633 commit 949b9b1ad59a2c7435d5f64ca5a2a3d1833de7bc Author: zentol Date: 2016-10-10T13:30:48Z [FLINK-4497] [cassandra] Scala Case Classes / Tuple support > Add support for Scala tuples and case classes to Cassandra sink > --- > > Key: FLINK-4497 > URL: https://issues.apache.org/jira/browse/FLINK-4497 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Chesnay Schepler > > The new Cassandra sink only supports streams of Flink Java tuples and Java > POJOs that have been annotated for use by Datastax Mapper. The sink should > be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)