Folks,
    I am trying to install zeppelin in our environment. I am really excited
about the promise of zeppelin and in fact if it does what it says on the
tin. I can get rid of the "reporting engine" of my application.

I have a cluster of 3 machines with

- Spark 1.5
- Cassandra 2.1.8
- libthrift 0.9.2
- cassandra-thrift 2.1.8
- spark-cassandra-connector_2.10-1.5.0-RC1.jar
- spark-cassandra-connector-java_2.10-1.5.0-RC1.jar

I cannot change this setup as my core application is built on the above
versions.

I did a git clone of master and built it using the command
 mvn clean package -Pcassandra-spark-1.5 -Dhadoop.version=2.6.0
-Phadoop-2.6 -DskipTests

so I suppose I have zepelin 0.6.0

The spark script in zeppelin is a simple fetch from cassandra and display
the records

%spark
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import com.datastax.spark.connector.CassandraRow
import com.cybersparta.athena.events.checkpointfw.CheckpointfwlogEvent
import com.cybersparta.athena.events.correlation.CorrelatedEvent
import com.cybersparta.athena.events.checkpointfw.CheckpointfwlogEventSerde
import com.google.gson.JsonParser
import com.google.gson.JsonObject


val KEYSPACE = "cybersparta"
val TABLE    = "correlatedevents"

var stratioQueryStr = "{filter: {upper: \"2016-03-18 22:17:01.145+0000\",
lower: \"2016-03-16 22:17:01.145+0000\", include_lower: \"true\","
stratioQueryStr += "field: \"TIMESTAMP\", include_upper: \"true\", type:
\"range\" }, "
stratioQueryStr += "query: {field: \"RULE_CODE\", type: \"match\", value:
\"B:0010\" }}"

val rdd = sc.cassandraTable(KEYSPACE,TABLE).where("stratio_col=?",
stratioQueryStr)


The query runs fine and returns 539 records. Now I am trying to print the
elements just so that I can manipulate the fields and draw charts.


The issue is with the next set of lines


rdd.count()

val nonStdRdd = rdd.map{
    case(row:CassandraRow) =>
        val DATE_TIME_STORE_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ")
        val evtTime =
DateTime.parse(row.getString("TIMESTAMP"),DATE_TIME_STORE_FORMATTER)

        val lst = row.getList[String]("CORRELATED_EVENTS").map{
            case (obj) =>
                obj.asInstanceOf[String]
        }
        lst(0)
}

nonStdRdd.take(10).foreach(println)


1) If I run this on a "spark standalone cluster", the app gets accepted at
the SPARK master as I see it in the MASTER UI but it is in a WAITING state.
There is nothing else running on the cluster at the moment and I just can't
seem to get it out of this state. There is nothing in the spark logs other
than saying the application was accepted. None of the workers have the app.

2) If I run it with "local[*]" it works fine but if I replace
'nonStdRdd.take(10).foreach(println)'
with nonStdRdd.foreach(println), the browser hangs, it is taking up 30% of
CPU. Looks like the "sparkContext" in this scenario is running within the
browser and it is strange that to print 539 strings it should hang.

I am stuck either ways, any help would be appreciated.

I suspect it is a library version mis-match but not able to pinpoint it, I
am confident that the above listed libs and the versions are what zeppelin
is picking up as I have removed all other libs. Also if I get rid of any of
those zeppelin does not initialize.


----------------------------------- output
-------------------------------------------------


rdd:
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
= CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15

stuck here forever



Regards
-Ravi Gurram

Reply via email to