bq. reportError("Exception while streaming travis", e) I assume there was none of the above in your job.
What Spark release are you using ? Thanks On Sat, Mar 5, 2016 at 4:57 AM, Dominik Safaric <dominiksafa...@gmail.com> wrote: > Dear all, > > Lately, as a part of a scientific research, I've been developing an > application that streams (or at least should) data from Travis CI and > GitHub, using their REST API's. The purpose of this is to get insight into > the commit-build relationship, in order to further perform numerous > analysis. > > For this, I've implemented the following Travis custom receiver: > > > object TravisUtils { > > def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : > ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel) > > } > > private[streaming] > class TravisInputDStream(ctx : StreamingContext, storageLevel : > StorageLevel) extends ReceiverInputDStream[Build](ctx) { > > def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel) > > } > > private[streaming] > class TravisReceiver(storageLevel: StorageLevel) extends > Receiver[Build](storageLevel) with Logging { > > def onStart() : Unit = { > new BuildStream().addListener(new BuildListener { > > override def onBuildsReceived(numberOfBuilds: Int): Unit = { > > } > > override def onBuildRepositoryReceived(build: Build): Unit = { > store(build) > } > > override def onException(e: Exception): Unit = { > reportError("Exception while streaming travis", e) > } > }) > > } > > def onStop() : Unit = { > > } > } > > Whereas the receiver uses my custom made TRAVIS API library (developed in > Java using Apache Async Client). However, the problem is the following: the > data that I should be receiving is continuous and changes i.e. is being > pushed to Travis and GitHub constantly. As an example, consider the fact > that GitHub records per second approx. 350 events - including push events, > commit comment and similar. > > But, when streaming either GitHub or Travis, I do get the data from the > first two batches, but then afterwards, the RDD's apart of the DStream are > empty - although there is data to be streamed! > > I've checked so far couple of things, including the HttpClient used for > omitting requests to the API, but none of them did actually solve this > problem. > > Therefore, my question is - what could be going on? Why isn't Spark > streaming the data after period x passes. Below, you may find the set > context and configuration: > > val configuration = new > SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") > > val ctx = new StreamingContext(configuration, Seconds(3)) > > val stream = GitHubUtils.createStream(ctx, > StorageLevel.MEMORY_AND_DISK_SER) > > // RDD IS EMPTY - that is what is happenning! > stream.window(Seconds(9)).foreachRDD(rdd => { > if (rdd.isEmpty()) {println("RDD IS EMPTY")} else > {rdd.collect().foreach(event => println(event.getRepo.getName + " " + > event.getId))} > }) > > ctx.start() > ctx.awaitTermination() > > Thanks in advance! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Travis-CI-and-GitHub-custom-receiver-continuous-data-but-empty-RDD-tp26406.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >