Re: Question about Spark Streaming Receiver Failure
I have checked Dibyendu's code, it looks that his implementation has auto-restart mechanism: src/main/java/consumer/kafka/client/KafkaReceiver.java: private void start() { // Start the thread that receives data over a connection KafkaConfig kafkaConfig = new KafkaConfig(_props); ZkState zkState = new ZkState(kafkaConfig); _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this); _kConsumer.open(_partitionId); Thread.UncaughtExceptionHandler eh = new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread th, Throwable ex) { restart("Restarting Receiver for Partition " + _partitionId , ex, 5000); } }; _consumerThread = new Thread(_kConsumer); _consumerThread.setDaemon(true); _consumerThread.setUncaughtExceptionHandler(eh); _consumerThread.start(); } I also checked Spark's native Kafka Receiver implementation, and it looks not have any auto-restart support. Any comments from Dibyendu? On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das wrote: > As i seen, once i kill my receiver on one machine, it will automatically > spawn another receiver on another machine or on the same machine. > > Thanks > Best Regards > > On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang wrote: > >> Dibyendu, >> >> Thanks for the reply. >> >> I am reading your project homepage now. >> >> One quick question I care about is: >> >> If the receivers failed for some reasons(for example, killed brutally by >> someone else), is there any mechanism for the receiver to fail over >> automatically? >> >> On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Which version of Spark you are running ? >>> >>> You can try this Low Level Consumer : >>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer >>> >>> This is designed to recover from various failures and have very good >>> fault recovery mechanism built in. This is being used by many users and at >>> present we at Pearson running this Receiver in Production for almost 3 >>> months without any issue. >>> >>> You can give this a try. >>> >>> Regards, >>> Dibyendu >>> >>> On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das >>> wrote: >>> >>>> You need to figure out why the receivers failed in the first place. >>>> Look in your worker logs and see what really happened. When you run a >>>> streaming job continuously for longer period mostly there'll be a lot of >>>> logs (you can enable log rotation etc.) and if you are doing a groupBy, >>>> join, etc type of operations, then there will be a lot of shuffle data. So >>>> You need to check in the worker logs and see what happened (whether DISK >>>> full etc.), We have streaming pipelines running for weeks without having >>>> any issues. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang >>>> wrote: >>>> >>>>> Guys, >>>>> >>>>> We have a project which builds upon Spark streaming. >>>>> >>>>> We use Kafka as the input stream, and create 5 receivers. >>>>> >>>>> When this application runs for around 90 hour, all the 5 receivers >>>>> failed for some unknown reasons. >>>>> >>>>> In my understanding, it is not guaranteed that Spark streaming >>>>> receiver will do fault recovery automatically. >>>>> >>>>> So I just want to figure out a way for doing fault-recovery to deal >>>>> with receiver failure. >>>>> >>>>> There is a JIRA post mentioned using StreamingLister for monitoring >>>>> the status of receiver: >>>>> >>>>> >>>>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 >>>>> >>>>> However I haven't found any open doc about how to do this stuff. >>>>> >>>>> Any guys have met the same issue and deal with it? >>>>> >>>>> Our environment: >>>>>Spark 1.3.0 >>>>>Dual Master Configuration >>>>>Kafka 0.8.2 >>>>> >>>>> Thanks >>>>> >>>>> -- >>>>> yangjun...@gmail.com >>>>> http://hi.baidu.com/yjpro >>>>> >>>> >>>> >>> >> >> >> -- >> yangjun...@gmail.com >> http://hi.baidu.com/yjpro >> > > -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Which version of Spark you are running ? > > You can try this Low Level Consumer : > http://spark-packages.org/package/dibbhatt/kafka-spark-consumer > > This is designed to recover from various failures and have very good fault > recovery mechanism built in. This is being used by many users and at > present we at Pearson running this Receiver in Production for almost 3 > months without any issue. > > You can give this a try. > > Regards, > Dibyendu > > On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das > wrote: > >> You need to figure out why the receivers failed in the first place. Look >> in your worker logs and see what really happened. When you run a streaming >> job continuously for longer period mostly there'll be a lot of logs (you >> can enable log rotation etc.) and if you are doing a groupBy, join, etc >> type of operations, then there will be a lot of shuffle data. So You need >> to check in the worker logs and see what happened (whether DISK full etc.), >> We have streaming pipelines running for weeks without having any issues. >> >> Thanks >> Best Regards >> >> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang wrote: >> >>> Guys, >>> >>> We have a project which builds upon Spark streaming. >>> >>> We use Kafka as the input stream, and create 5 receivers. >>> >>> When this application runs for around 90 hour, all the 5 receivers >>> failed for some unknown reasons. >>> >>> In my understanding, it is not guaranteed that Spark streaming receiver >>> will do fault recovery automatically. >>> >>> So I just want to figure out a way for doing fault-recovery to deal with >>> receiver failure. >>> >>> There is a JIRA post mentioned using StreamingLister for monitoring the >>> status of receiver: >>> >>> >>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 >>> >>> However I haven't found any open doc about how to do this stuff. >>> >>> Any guys have met the same issue and deal with it? >>> >>> Our environment: >>>Spark 1.3.0 >>>Dual Master Configuration >>>Kafka 0.8.2 >>> >>> Thanks >>> >>> -- >>> yangjun...@gmail.com >>> http://hi.baidu.com/yjpro >>> >> >> > -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Akhil, I have checked the logs. There isn't any clue as to why the 5 receivers failed. That's why I just take it for granted that it will be a common issue for receiver failures, and we need to figure out a way to detect this kind of failure and do fail-over. Thanks On Mon, Mar 16, 2015 at 3:17 PM, Akhil Das wrote: > You need to figure out why the receivers failed in the first place. Look > in your worker logs and see what really happened. When you run a streaming > job continuously for longer period mostly there'll be a lot of logs (you > can enable log rotation etc.) and if you are doing a groupBy, join, etc > type of operations, then there will be a lot of shuffle data. So You need > to check in the worker logs and see what happened (whether DISK full etc.), > We have streaming pipelines running for weeks without having any issues. > > Thanks > Best Regards > > On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang wrote: > >> Guys, >> >> We have a project which builds upon Spark streaming. >> >> We use Kafka as the input stream, and create 5 receivers. >> >> When this application runs for around 90 hour, all the 5 receivers failed >> for some unknown reasons. >> >> In my understanding, it is not guaranteed that Spark streaming receiver >> will do fault recovery automatically. >> >> So I just want to figure out a way for doing fault-recovery to deal with >> receiver failure. >> >> There is a JIRA post mentioned using StreamingLister for monitoring the >> status of receiver: >> >> >> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 >> >> However I haven't found any open doc about how to do this stuff. >> >> Any guys have met the same issue and deal with it? >> >> Our environment: >>Spark 1.3.0 >>Dual Master Configuration >>Kafka 0.8.2 >> >> Thanks >> >> -- >> yangjun...@gmail.com >> http://hi.baidu.com/yjpro >> > > -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Question about Spark Streaming Receiver Failure
Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?
Guys, I have a question regarding to Spark 1.1 broadcast implementation. In our pipeline, we have a large multi-class LR model, which is about 1GiB size. To employ the benefit of Spark parallelism, a natural thinking is to broadcast this model file to the worker node. However, it looks that broadcast performance is not quite good. During the process of broadcasting the model file, I just monitor the network card throughput of worker node, their recv/write throughput is just around 30~40 MiB( our server box is equipped with 100MiB ethernet card). Is this the real limitation of Spark 1.1 broadcast implementation? Or there may be some configuration or tricks that can help make Spark broadcast perform better. Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: k-means clustering
Guys, As to the questions of pre-processing, you could just migrate your logic to Spark before using K-means. I only used Scala on Spark, and haven't used Python binding on Spark, but I think the basic steps must be the same. BTW, if your data set is big with huge sparse dimension feature vector, K-Means may not works as good as you expected. And I think this is still the optimization direction of Spark MLLib. On Wed, Nov 19, 2014 at 2:21 PM, amin mohebbi wrote: > Hi there, > > I would like to do "text clustering" using k-means and Spark on a massive > dataset. As you know, before running the k-means, I have to do > pre-processing methods such as TFIDF and NLTK on my big dataset. The > following is my code in python : > > if __name__ == '__main__': # Cluster a bunch of text documents. import re > import sys k = 6 vocab = {} xs = [] ns=[] cat=[] filename='2013-01.csv' > with open(filename, newline='') as f: try: newsreader = csv.reader(f) for > row in newsreader: ns.append(row[3]) cat.append(row[4]) except csv.Error > as e: sys.exit('file %s, line %d: %s' % (filename, newsreader.line_num, > e)) remove_spl_char_regex = re.compile('[%s]' % > re.escape(string.punctuation)) # regex to remove special characters > remove_num = re.compile('[\d]+') #nltk.download() stop_words= > nltk.corpus.stopwords.words('english') for a in ns: x = defaultdict(float) > a1 = a.strip().lower() a2 = remove_spl_char_regex.sub(" ",a1) # Remove > special characters a3 = remove_num.sub("", a2) #Remove numbers #Remove > stop words words = a3.split() filter_stop_words = [w for w in words if not > w in stop_words] stemed = [PorterStemmer().stem_word(w) for w in > filter_stop_words] ws=sorted(stemed) #ws=re.findall(r"\w+", a1) for w in > ws: vocab.setdefault(w, len(vocab)) x[vocab[w]] += 1 xs.append(x.items()) > > Can anyone explain to me how can I do the pre-processing step, before > running the k-means using spark. > > > Best Regards > > ... > > Amin Mohebbi > > PhD candidate in Software Engineering > at university of Malaysia > > Tel : +60 18 2040 017 > > > > E-Mail : tp025...@ex.apiit.edu.my > > amin_...@me.com > -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Questions Regarding to MPI Program Migration to Spark
Guys, Recently we are migrating our backend pipeline from to Spark. In our pipeline, we have a MPI-based HAC implementation, to ensure the result consistency of migration, we also want to migrate this MPI-implemented code to Spark. However, during the migration process, I found that there are some possible limitation with Spark. In the original MPI implementation, the logic looks like the following: Node 0( master node ) Get the complete document data, store in g_doc_data Get the document sub-set for which this node needs to calculate the distance metrics, store in l_dist_metric_data while ( exit condition is not met ) { Find the locally closed node pair, notated as l_closest_pair Get the globally closed node pair from other nodes via MPI's MPI_AllReduce, notated as g_closest_pair Merge the globally closed node pair and update the document data g_doc_data. Re-calculate the distance metrics for those node pair which will be impacted by the above merge operations, update l_dist_metric_data. } Node 1/2/.../P ( slave nodes ) Get the complete document data, store in g_doc_data Get the document sub-set for which this node needs to calculate the distance metrics, store in l_dist_metric_data while ( exit condition is not met ) { Find the locally closed node pair, notated as l_closest_pair Get the globally closed node pair from other nodes via MPI's MPI_AllReduce, notated as g_closest_pair Merge the globally closed node pair and update the document data g_doc_data. Re-calculate the distance metrics for those node pair which will be impacted by the above merge operations, update l_dist_metric_data. } The essential difficulty for migrating the above logic to Spark is: In the original implementation, between each iteration, the computation nodes need to hold the local state( which is g_doc_data and l_dist_metric_data ). And in Spark, it looks that there isn't any effective ways for keeping intermediate local state between iterations. Usually in Spark, we use either broadcast variable or closure to pass state to the operations of each iterations. Of course, after each iteration, we could summarize the change effects from all the worker nodes via reduce and then broadcast this summarization effect to them back again. But this operation will involve a significant data transfer, when the data size is large ( e.g. 100 thousands documents with 500 dimension feature vectors ), and the performance penalty is non-neglectable. So my question is: 1. Is the difficulty I mentioned above is the limitations imposed by the computation paradigm of Spark? 2. Is there any possible ways for implementing the bottom-up agglomeration hierarchical clustering algorithms in Spark? BTW, I know there are some top-down divisive hierarchical clustering algorithm in the upcoming 1.2 release, I will also give them a try. Thanks. -- yangjun...@gmail.com http://hi.baidu.com/yjpro