sure I can, everything is on localhost . . . . it only happens when i want to write in two or more tables in the same schema
A G 2015-05-29 16:10 GMT+02:00 Yana Kadiyska <yana.kadiy...@gmail.com>: > are you able to connect to your cassandra installation via > > cassandra_home$ ./bin/cqlsh > > This exception generally means that your cassandra instance is not > reachable/accessible > > On Fri, May 29, 2015 at 6:11 AM, Antonio Giambanco <antogia...@gmail.com> > wrote: > >> Hi all, >> I have in a single server installed spark 1.3.1 and cassandra 2.0.14 >> I'm coding a simple java class for Spark Streaming as follow: >> >> - reading header events from flume sink >> - based on header I write the event body on navigation or transaction >> table (cassandra) >> >> unfortunatly I get NoHostAvailableException, if I comment the code for >> saving one of the two tables everything works >> >> >> *here the code* >> >> public static void main(String[] args) { >> >> // Create a local StreamingContext with two working thread and >> batch interval of 1 second >> SparkConf conf = new >> SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp"); >> >> conf.set("spark.cassandra.connection.host", "127.0.0.1"); >> conf.set("spark.cassandra.connection.native.port","9042"); >> conf.set("spark.cassandra.output.batch.size.rows", "1"); >> conf.set("spark.cassandra.output.concurrent.writes", "1"); >> >> >> final JavaStreamingContext jssc = new JavaStreamingContext(conf, >> Durations.seconds(1)); >> >> JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = >> FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888); >> >> >> JavaDStream<String> logRowsNavig = flumeStreamNavig.map( >> new Function<SparkFlumeEvent,String>(){ >> >> @Override >> public String call(SparkFlumeEvent arg0) throws >> Exception { >> // TODO Auto-generated method stub0. >> >> Map<CharSequence,CharSequence> headers = >> arg0.event().getHeaders(); >> >> ByteBuffer bytePayload = arg0.event().getBody(); >> String s = headers.get("source_log").toString() + >> "#" + new String(bytePayload.array()); >> System.out.println("RIGA: " + s); >> return s; >> } >> }); >> >> >> logRowsNavig.foreachRDD( >> new Function<JavaRDD<String>,Void>(){ >> @Override >> public Void call(JavaRDD<String> rows) throws >> Exception { >> >> if(!rows.isEmpty()){ >> >> //String header = >> getHeaderFronRow(rows.collect()); >> >> List<Navigation> listNavigation = new >> ArrayList<Navigation>(); >> List<Transaction> listTransaction = new >> ArrayList<Transaction>(); >> >> for(String row : rows.collect()){ >> >> String header = row.substring(0, >> row.indexOf("#")); >> >> if(header.contains("controller_log")){ >> >> listNavigation.add(createNavigation(row)); >> System.out.println("Added Element in >> Navigation List"); >> >> }else >> if(header.contains("business_log")){ >> >> listTransaction.add(createTransaction(row)); >> System.out.println("Added Element in >> Transaction List"); >> } >> >> } >> >> >> if(!listNavigation.isEmpty()){ >> JavaRDD<Navigation> navigationRows= >> jssc.sparkContext().parallelize(listNavigation); >> >> >> javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", >> mapToRow(Navigation.class)).saveToCassandra(); >> } >> >> >> if(!listTransaction.isEmpty()){ >> JavaRDD<Transaction> transactionRows= >> jssc.sparkContext().parallelize(listTransaction); >> >> >> javaFunctions(transactionRows).writerBuilder("cassandrasink", >> "transaction", mapToRow(Transaction.class)).saveToCassandra(); >> >> } >> >> } >> return null; >> >> } >> }); >> >> jssc.start(); // Start the computation >> jssc.awaitTermination(); // Wait for the computation to >> terminate >> } >> >> >> *here the exception* >> >> >> 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute: >> >> com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 >> >> com.datastax.driver.core.exceptions.NoHostAvailableException: All >> >> host(s) tried for query failed (no host was tried) >> >> at >> >> >> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) >> >> at >> >> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) >> >> at >> >> >> com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577) >> >> at >> >> >> com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> at >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:601) >> >> at >> >> >> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) >> >> at $Proxy17.executeAsync(Unknown Source) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> at >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:601) >> >> at >> >> >> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) >> >> at $Proxy17.executeAsync(Unknown Source) >> >> at >> >> >> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) >> >> at >> >> >> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) >> >> at >> >> >> com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at >> >> >> com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> >> at java.lang.Thread.run(Thread.java:722) >> >> 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 >> (TID 20) >> >> java.io.IOException: Failed to write statements to >> cassandrasink.navigation. >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> >> at java.lang.Thread.run(Thread.java:722) >> >> 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID >> 20, localhost): java.io.IOException: Failed to write statements to >> cassandrasink.navigation. >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) >> >> at >> >> >> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) >> >> at >> >> >> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> >> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) >> >> at >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> >> at >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> >> at java.lang.Thread.run(Thread.java:722) >> >> >> >> A G >> >> >