Thanks Chiwan. It worked.
Now I have this simple streaming program in Spark Scala that gets streaming
data via Kafka. It is pretty simple. Please see attached.
I am trying to make it work with Flink + Kafka
Any hints will be appreciated.
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
On 18 April 2016 at 02:43, Chiwan Park <[email protected]> wrote:
> Hi Mich,
>
> You can add external dependencies to Scala shell using `--addclasspath`
> option. There is more detail description in documentation [1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies
>
> Regards,
> Chiwan Park
>
> > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh <[email protected]>
> wrote:
> >
> > Hi,
> >
> > IN Spark shell I can load Kafka jar file through spark-shell option --jar
> >
> > spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar
> >
> > This works fine.
> >
> > In Flink I have added the jar file
> /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH.
> >
> > However I don't get any support for it within flink shell
> >
> > Scala-Flink> import org.apache.flink.streaming.connectors.kafka
> > <console>:54: error: object connectors is not a member of package
> org.apache.flink.streaming
> > import org.apache.flink.streaming.connectors.kafka
> >
> >
> > Any ideas will be appreciated
> > ^
> >
> > Dr Mich Talebzadeh
> >
> > LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> > http://talebzadehmich.wordpress.com
> >
>
>
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object TestStream_assembly {
def main(args: Array[String]) {
val conf = new SparkConf().
setAppName("TestStream_assembly").
setMaster("local[2]").
set("spark.driver.allowMultipleContexts", "true").
set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val ssc = new StreamingContext(conf, Seconds(55))
val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092",
"schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" ->
"rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
messages.cache()
//
// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
}
}