Hi Shashank,

the exception you get is a known issue [0] that will be fixed with Flink 1.4.1. We improved the dependency management but it seems this causes some problems with the Cassandra connector right now.

So as a workaround you can add netty (version 4.0) to your dependencies. This should fix the problem until the new Flink version. Please let us know if yes.

Regards,
Timo

[0] https://issues.apache.org/jira/browse/FLINK-8295



Am 12/19/17 um 1:41 PM schrieb shashank agarwal:
I have tried that by creating class with companion static object:

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
 @BeanProperty var order_name: String,
 @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88) at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88) at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <shashank...@gmail.com <mailto:shashank...@gmail.com>> wrote:

    HI,

    I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra
    sink in my scala application. Before sink, i was converting my
    scala datastream to java stream and sinking in Cassandra. I have
    created pojo class in scala liked that :

    @SerialVersionUID(507L)
    @Table(keyspace = "neofp", name = "order_detail")
    case class OrderFinal(
                           @BeanProperty var order_name: String,
                           @BeanProperty var user: String )extends
    Serializable
    {
      def this() {
        this("NA", "NA",)
      }
    }

    and this was working fine with sink. after upgrading to 1.4.0 it's
    giving error "Query must not be null or empty."

    After dig into the CassandraSink code, I have found it's treating
    it as case class and running CassandraScalaProductSinkBuilder
    which do sanityCheck of query existence.

    So how I can create POJO class in scala so CassandraSink treats it
    as CassandraPojoSinkBuilder?

    For workaround now I have downgraded the only connector to 1.3.2


    Thanks
    Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply via email to