import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.nifi._
import org.apache.nifi.remote._
import org.apache.nifi.remote.client._
import org.apache.nifi.spark._

var z = new Array[SiteToSiteClientConfig](3)

val c = new SiteToSiteClient.Builder
c.url("http://localhost:8090/nifi")
c.portName("Data For Spark")
z(0) = c.buildConfig
z(1) = c.buildConfig
z(2) = c.buildConfig

val ssc = new StreamingContext(sc, Seconds(2))
//val stream1 = ssc.receiverStream(new NiFiReceiver(c.buildConfig,StorageLevel.MEMORY_ONLY))
val streams = z.map { i => ssc.receiverStream(new NiFiReceiver(i,StorageLevel.MEMORY_ONLY)) }
val stream1 = ssc.union(streams)

val text = stream1.map(d => d.getContent.toString)
val words = text.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
