bq.         reportError("Exception while streaming travis", e)

I assume there was none of the above in your job.

What Spark release are you using ?

Thanks

On Sat, Mar 5, 2016 at 4:57 AM, Dominik Safaric <dominiksafa...@gmail.com>
wrote:

> Dear all,
>
> Lately, as a part of a scientific research, I've been developing an
> application that streams (or at least should) data from Travis CI and
> GitHub, using their REST API's. The purpose of this is to get insight into
> the commit-build relationship, in order to further perform numerous
> analysis.
>
> For this, I've implemented the following Travis custom receiver:
>
>
> object TravisUtils {
>
>   def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
> ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel)
>
> }
>
> private[streaming]
> class TravisInputDStream(ctx : StreamingContext, storageLevel :
> StorageLevel) extends ReceiverInputDStream[Build](ctx) {
>
>   def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel)
>
> }
>
> private[streaming]
> class TravisReceiver(storageLevel: StorageLevel) extends
> Receiver[Build](storageLevel) with Logging {
>
>   def onStart() : Unit = {
>     new BuildStream().addListener(new BuildListener {
>
>       override def onBuildsReceived(numberOfBuilds: Int): Unit = {
>
>       }
>
>       override def onBuildRepositoryReceived(build: Build): Unit = {
>         store(build)
>       }
>
>       override def onException(e: Exception): Unit = {
>         reportError("Exception while streaming travis", e)
>       }
>     })
>
>   }
>
>   def onStop() : Unit = {
>
>   }
> }
>
> Whereas the receiver uses my custom made TRAVIS API library (developed in
> Java using Apache Async Client). However, the problem is the following: the
> data that I should be receiving is continuous and changes i.e. is being
> pushed to Travis and GitHub constantly. As an example, consider the fact
> that GitHub records per second approx. 350 events - including push events,
> commit comment and similar.
>
> But, when streaming either GitHub or Travis, I do get the data from the
> first two batches, but then afterwards, the RDD's apart of the DStream are
> empty - although there is data to be streamed!
>
> I've checked so far couple of things, including the HttpClient used for
> omitting requests to the API, but none of them did actually solve this
> problem.
>
> Therefore, my question is - what could be going on? Why isn't Spark
> streaming the data after period x passes. Below, you may find the set
> context and configuration:
>
> val configuration = new
> SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
>
>     val ctx = new StreamingContext(configuration, Seconds(3))
>
>     val stream = GitHubUtils.createStream(ctx,
> StorageLevel.MEMORY_AND_DISK_SER)
>
>     // RDD IS EMPTY - that is what is happenning!
>     stream.window(Seconds(9)).foreachRDD(rdd => {
>       if (rdd.isEmpty()) {println("RDD IS EMPTY")} else
> {rdd.collect().foreach(event => println(event.getRepo.getName + " " +
> event.getId))}
>     })
>
>     ctx.start()
>     ctx.awaitTermination()
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Travis-CI-and-GitHub-custom-receiver-continuous-data-but-empty-RDD-tp26406.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to