Hi Folks:
Just checking if anyone has any pointers for the cassandra spark connector
issue I've mentioned :
IllegalArgumentException on executing save after repartition by cassandra
replica:val customersRdd =
customers.rdd.repartitionByCassandraReplica("test","customers")
customersRdd.saveToCassandra("test","customer")
Thanks
On Wednesday, July 18, 2018, 10:45:25 AM PDT, M Singh
<[email protected]> wrote:
Hi Cassandra/Spark experts:
I am working with on a project to save Spark dataframe to cassandra and am
getting an exception regarding row size not valid (see below). I tried to trace
the code in the connector and it appears that the row size (3 below) is
different from the column count (which turns out be 1). I am trying to follow
the example from
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
with customer having two more fields than just the id as mentioned in the
example. In case of the example I think it will work because it has only 1
column (customer_id) but I need to save additional fields. I've tried
searching but have not found any resolution to this.
I am using Spark 2.3.0 and spark-cassandra-connector:2.3.0-s_2.11.
Just a little background - I tried saving the dataframe to cassandra and it
works. However, it is very slow. So I am trying to see if using
repartitionByCassandraReplica will make it faster. I've tried various
combinations of batch rows size, concurrent writers, etc on data frame and it
is still very slow. Therefore I am looking at using
repartitionByCassandraReplica before trying to save it to the cassandra table.
If there are other options to make saving dataframe to cassandra faster, please
let me know.
Here is my scenario:
Cassandra table in keyspace test - create table customer ( customer_id text
primary key, order integer, value integer);Spark shell commands: import
com.datastax.spark.connector._ import org.apache.spark.sql.cassandra._ case
class Customer(customer_id:String, order_id:Int, value:Int)
val customers =
Seq(Customer("1",1,1),Customer("2",2,2)).toDF("customer_id","order_id","value")
val customersRdd =
customers.rdd.repartitionByCassandraReplica("test","customers")
customersRdd.saveToCassandra("test","customer")
At this point I get an exception :
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3
instead of 1. at scala.Predef$.require(Predef.scala:224) at
com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
at
com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at
com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
at
com.datastax.spark.connector.rdd.partitioner.TokenGenerator.getPartitionKeyBufferFor(TokenGenerator.scala:38)
at
com.datastax.spark.connector.rdd.partitioner.ReplicaPartitioner.getPartition(ReplicaPartitioner.scala:70)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)18/07/18 10:27:51 ERROR Executor:
Exception in task 1.0 in stage 6.0 (TID 4)java.lang.IllegalArgumentException:
requirement failed: Invalid row size: 3 instead of 1.
Thanks for your help.