Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-28 Thread shashank agarwal
Hi Micheal, Thanks for the response actually I have solved the issue. I was sharing my knowledge how I solved that. For sinking scala classes like JAVA Pojo. We have to convert that to JavaStream first but in 1.4 that already done by connector so no need to do that in 1.4 We have to write scala

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal Not sure if I can answer fully your question, but after digging some code, I am not sure if C* connector totally supports Scala case class + CQL data mapping at the moment. I may be totally wrong, and you need to

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal AFAIK, in java side, for a pojo data type, you don't need to set query since the CQL data mapping would take care of that whereas dealing with java tuples, you do need to provide a upsert query so that cassandra knows what to insert into the table. Scala tuple case is clear,

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-21 Thread Timo Walther
Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1. For case classes there is also a dedicated cassandra sink (every case class is a Product):

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-21 Thread shashank agarwal
Hi, I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this. @SerialVersionUID(507L) @Table(keyspace = "twtt", name = "order") class OrderFinal( @BeanProperty var name: String,

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread Timo Walther
Hi Shashank, the exception you get is a known issue [0] that will be fixed with Flink 1.4.1. We improved the dependency management but it seems this causes some problems with the Cassandra connector right now. So as a workaround you can add netty (version 4.0) to your dependencies. This

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread shashank agarwal
I have tried that by creating class with companion static object: @SerialVersionUID(507L) @Table(keyspace = "neofp", name = "order_detail") class OrderFinal( @BeanProperty var order_name: String, @BeanProperty var user: String )extends Serializable {

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread Timo Walther
Hi Shashank, Scala case classes are treated as a special tuple type in Flink. If you want to make a POJO out of it, just remove the "case" keyword and make sure that the class is static (in the companion object). I hope that helps. Timo Am 12/19/17 um 11:04 AM schrieb shashank agarwal:

Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread shashank agarwal
HI, I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that : @SerialVersionUID(507L) @Table(keyspace = "neofp", name =