Hi Folks:
I am working with on a project to save Spark dataframe to cassandra and am 
getting an exception regarding row size not valid (see below). I tried to trace 
the code in the connector and it appears that the row size (3 below) is 
different from the column count (which turns out be 1).  I am trying to follow 
the example from 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
 with customer having two more fields than just the id as mentioned in the 
example.  In case of the example I think it will work because it has only 1 
column (customer_id) but I need to save additional fields.  I've tried 
searching but have not found any resolution to this.
I am using Spark 2.3.0 and spark-cassandra-connector:2.3.0-s_2.11.
Just a little background - I tried saving the dataframe to cassandra and it 
works.  However, it is very slow.  So I am trying to see if using 
repartitionByCassandraReplica will make it faster.  I've tried various 
combinations of batch rows size, concurrent writers, etc on data frame and it 
is still very slow.  Therefore I am looking at using 
repartitionByCassandraReplica before trying to save it to the cassandra table.  
If there are other options to make saving dataframe to cassandra faster, please 
let me know.
Here is my scenario:
Cassandra table in keyspace test - create table customer ( customer_id text 
primary key, order integer, value integer);Spark shell commands:   import 
com.datastax.spark.connector._  import org.apache.spark.sql.cassandra._  case 
class Customer(customer_id:String, order_id:Int, value:Int)
  val customers = 
Seq(Customer("1",1,1),Customer("2",2,2)).toDF("customer_id","order_id","value") 
 val customersRdd = 
customers.rdd.repartitionByCassandraReplica("test","customers")  
customersRdd.saveToCassandra("test","customer")

At this point I get an exception :
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 
instead of 1. at scala.Predef$.require(Predef.scala:224) at 
com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
 at 
com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
 at 
com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
 at 
com.datastax.spark.connector.rdd.partitioner.TokenGenerator.getPartitionKeyBufferFor(TokenGenerator.scala:38)
 at 
com.datastax.spark.connector.rdd.partitioner.ReplicaPartitioner.getPartition(ReplicaPartitioner.scala:70)
 at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
at org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)18/07/18 10:27:51 ERROR Executor: 
Exception in task 1.0 in stage 6.0 (TID 4)java.lang.IllegalArgumentException: 
requirement failed: Invalid row size: 3 instead of 1.
Thanks for your help.

Reply via email to