Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
I have checked Dibyendu's code, it looks that his implementation has
auto-restart mechanism:


src/main/java/consumer/kafka/client/KafkaReceiver.java:

private void start() {

// Start the thread that receives data over a connection
KafkaConfig kafkaConfig = new KafkaConfig(_props);
ZkState zkState = new ZkState(kafkaConfig);
_kConsumer = new KafkaConsumer(kafkaConfig, zkState, this);
_kConsumer.open(_partitionId);

Thread.UncaughtExceptionHandler eh = new
Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
  restart("Restarting Receiver for Partition " + _partitionId , ex,
5000);
}
};

_consumerThread = new Thread(_kConsumer);
_consumerThread.setDaemon(true);
_consumerThread.setUncaughtExceptionHandler(eh);
_consumerThread.start();
  }

I also checked Spark's native Kafka Receiver implementation, and it looks
not have any auto-restart support.

Any comments from Dibyendu?

On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das 
wrote:

> As i seen, once i kill my receiver on one machine, it will automatically
> spawn another receiver on another machine or on the same machine.
>
> Thanks
> Best Regards
>
> On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang  wrote:
>
>> Dibyendu,
>>
>> Thanks for the reply.
>>
>> I am reading your project homepage now.
>>
>> One quick question I care about is:
>>
>> If the receivers failed for some reasons(for example, killed brutally by
>> someone else), is there any mechanism for the receiver to fail over
>> automatically?
>>
>> On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Which version of Spark you are running ?
>>>
>>> You can try this Low Level Consumer :
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>>>
>>> This is designed to recover from various failures and have very good
>>> fault recovery mechanism built in. This is being used by many users and at
>>> present we at Pearson running this Receiver in Production for almost 3
>>> months without any issue.
>>>
>>> You can give this a try.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das 
>>> wrote:
>>>
>>>> You need to figure out why the receivers failed in the first place.
>>>> Look in your worker logs and see what really happened. When you run a
>>>> streaming job continuously for longer period mostly there'll be a lot of
>>>> logs (you can enable log rotation etc.) and if you are doing a groupBy,
>>>> join, etc type of operations, then there will be a lot of shuffle data. So
>>>> You need to check in the worker logs and see what happened (whether DISK
>>>> full etc.), We have streaming pipelines running for weeks without having
>>>> any issues.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang 
>>>> wrote:
>>>>
>>>>> Guys,
>>>>>
>>>>> We have a project which builds upon Spark streaming.
>>>>>
>>>>> We use Kafka as the input stream, and create 5 receivers.
>>>>>
>>>>> When this application runs for around 90 hour, all the 5 receivers
>>>>> failed for some unknown reasons.
>>>>>
>>>>> In my understanding, it is not guaranteed that Spark streaming
>>>>> receiver will do fault recovery automatically.
>>>>>
>>>>> So I just want to figure out a way for doing fault-recovery to deal
>>>>> with receiver failure.
>>>>>
>>>>> There is a JIRA post mentioned using StreamingLister for monitoring
>>>>> the status of receiver:
>>>>>
>>>>>
>>>>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836
>>>>>
>>>>> However I haven't found any open doc about how to do this stuff.
>>>>>
>>>>> Any guys have met the same issue and deal with it?
>>>>>
>>>>> Our environment:
>>>>>Spark 1.3.0
>>>>>Dual Master Configuration
>>>>>Kafka 0.8.2
>>>>>
>>>>> Thanks
>>>>>
>>>>> --
>>>>> yangjun...@gmail.com
>>>>> http://hi.baidu.com/yjpro
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> yangjun...@gmail.com
>> http://hi.baidu.com/yjpro
>>
>
>


-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Dibyendu,

Thanks for the reply.

I am reading your project homepage now.

One quick question I care about is:

If the receivers failed for some reasons(for example, killed brutally by
someone else), is there any mechanism for the receiver to fail over
automatically?

On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Which version of Spark you are running ?
>
> You can try this Low Level Consumer :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
> This is designed to recover from various failures and have very good fault
> recovery mechanism built in. This is being used by many users and at
> present we at Pearson running this Receiver in Production for almost 3
> months without any issue.
>
> You can give this a try.
>
> Regards,
> Dibyendu
>
> On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das 
> wrote:
>
>> You need to figure out why the receivers failed in the first place. Look
>> in your worker logs and see what really happened. When you run a streaming
>> job continuously for longer period mostly there'll be a lot of logs (you
>> can enable log rotation etc.) and if you are doing a groupBy, join, etc
>> type of operations, then there will be a lot of shuffle data. So You need
>> to check in the worker logs and see what happened (whether DISK full etc.),
>> We have streaming pipelines running for weeks without having any issues.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang  wrote:
>>
>>> Guys,
>>>
>>> We have a project which builds upon Spark streaming.
>>>
>>> We use Kafka as the input stream, and create 5 receivers.
>>>
>>> When this application runs for around 90 hour, all the 5 receivers
>>> failed for some unknown reasons.
>>>
>>> In my understanding, it is not guaranteed that Spark streaming receiver
>>> will do fault recovery automatically.
>>>
>>> So I just want to figure out a way for doing fault-recovery to deal with
>>> receiver failure.
>>>
>>> There is a JIRA post mentioned using StreamingLister for monitoring the
>>> status of receiver:
>>>
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836
>>>
>>> However I haven't found any open doc about how to do this stuff.
>>>
>>> Any guys have met the same issue and deal with it?
>>>
>>> Our environment:
>>>Spark 1.3.0
>>>Dual Master Configuration
>>>Kafka 0.8.2
>>>
>>> Thanks
>>>
>>> --
>>> yangjun...@gmail.com
>>> http://hi.baidu.com/yjpro
>>>
>>
>>
>


-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Akhil,

I have checked the logs. There isn't any clue as to why the 5 receivers
failed.

That's why I just take it for granted that it will be  a common issue for
receiver failures, and we need to figure out a way to detect this kind of
failure and do fail-over.

Thanks

On Mon, Mar 16, 2015 at 3:17 PM, Akhil Das 
wrote:

> You need to figure out why the receivers failed in the first place. Look
> in your worker logs and see what really happened. When you run a streaming
> job continuously for longer period mostly there'll be a lot of logs (you
> can enable log rotation etc.) and if you are doing a groupBy, join, etc
> type of operations, then there will be a lot of shuffle data. So You need
> to check in the worker logs and see what happened (whether DISK full etc.),
> We have streaming pipelines running for weeks without having any issues.
>
> Thanks
> Best Regards
>
> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang  wrote:
>
>> Guys,
>>
>> We have a project which builds upon Spark streaming.
>>
>> We use Kafka as the input stream, and create 5 receivers.
>>
>> When this application runs for around 90 hour, all the 5 receivers failed
>> for some unknown reasons.
>>
>> In my understanding, it is not guaranteed that Spark streaming receiver
>> will do fault recovery automatically.
>>
>> So I just want to figure out a way for doing fault-recovery to deal with
>> receiver failure.
>>
>> There is a JIRA post mentioned using StreamingLister for monitoring the
>> status of receiver:
>>
>>
>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836
>>
>> However I haven't found any open doc about how to do this stuff.
>>
>> Any guys have met the same issue and deal with it?
>>
>> Our environment:
>>Spark 1.3.0
>>Dual Master Configuration
>>Kafka 0.8.2
>>
>> Thanks
>>
>> --
>> yangjun...@gmail.com
>> http://hi.baidu.com/yjpro
>>
>
>


-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Guys,

We have a project which builds upon Spark streaming.

We use Kafka as the input stream, and create 5 receivers.

When this application runs for around 90 hour, all the 5 receivers failed
for some unknown reasons.

In my understanding, it is not guaranteed that Spark streaming receiver
will do fault recovery automatically.

So I just want to figure out a way for doing fault-recovery to deal with
receiver failure.

There is a JIRA post mentioned using StreamingLister for monitoring the
status of receiver:

https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

However I haven't found any open doc about how to do this stuff.

Any guys have met the same issue and deal with it?

Our environment:
   Spark 1.3.0
   Dual Master Configuration
   Kafka 0.8.2

Thanks

-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-09 Thread Jun Yang
 Guys,

I have a question regarding to Spark 1.1 broadcast implementation.

In our pipeline, we have a large multi-class LR model, which is about 1GiB
size.
To employ the benefit of Spark parallelism, a natural thinking is to
broadcast this model file to the worker node.

However, it looks that broadcast performance is not quite good.

During the process of broadcasting the model file, I just monitor the
network card throughput of worker node, their
recv/write throughput is just around 30~40 MiB( our server box is equipped
with 100MiB ethernet card).

Is this the real limitation of Spark 1.1 broadcast implementation? Or there
may be some configuration or tricks
that can help make Spark broadcast perform better.

Thanks



-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: k-means clustering

2014-11-20 Thread Jun Yang
Guys,

As to the questions of pre-processing, you could just migrate your logic to
Spark before using K-means.

I only used Scala on Spark, and haven't used Python binding on Spark, but I
think the basic steps must be the same.

BTW, if your data set is big with huge sparse dimension feature vector,
K-Means may not works as good as you expected. And I think this is still
the optimization direction of Spark MLLib.

On Wed, Nov 19, 2014 at 2:21 PM, amin mohebbi 
wrote:

> Hi there,
>
> I would like to do "text clustering" using  k-means and Spark on a massive
> dataset. As you know, before running the k-means, I have to do
> pre-processing methods such as TFIDF and NLTK on my big dataset. The
> following is my code in python :
>
> if __name__ == '__main__': # Cluster a bunch of text documents. import re
> import sys k = 6 vocab = {} xs = [] ns=[] cat=[] filename='2013-01.csv'
> with open(filename, newline='') as f: try: newsreader = csv.reader(f) for
> row in newsreader: ns.append(row[3]) cat.append(row[4]) except csv.Error
> as e: sys.exit('file %s, line %d: %s' % (filename, newsreader.line_num,
> e))  remove_spl_char_regex = re.compile('[%s]' %
> re.escape(string.punctuation)) # regex to remove special characters
> remove_num = re.compile('[\d]+') #nltk.download() stop_words=
> nltk.corpus.stopwords.words('english') for a in ns: x = defaultdict(float)
> a1 = a.strip().lower() a2 = remove_spl_char_regex.sub(" ",a1) # Remove
> special characters a3 = remove_num.sub("", a2) #Remove numbers #Remove
> stop words words = a3.split() filter_stop_words = [w for w in words if not
> w in stop_words] stemed = [PorterStemmer().stem_word(w) for w in
> filter_stop_words] ws=sorted(stemed)  #ws=re.findall(r"\w+", a1) for w in
> ws: vocab.setdefault(w, len(vocab)) x[vocab[w]] += 1 xs.append(x.items())
>
> Can anyone explain to me how can I do the pre-processing step, before
> running the k-means using spark.
>
>
> Best Regards
>
> ...
>
> Amin Mohebbi
>
> PhD candidate in Software Engineering
>  at university of Malaysia
>
> Tel : +60 18 2040 017
>
>
>
> E-Mail : tp025...@ex.apiit.edu.my
>
>   amin_...@me.com
>



-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Questions Regarding to MPI Program Migration to Spark

2014-11-16 Thread Jun Yang
Guys,

Recently we are migrating our backend pipeline from to Spark.

In our pipeline, we have a MPI-based HAC implementation, to ensure the
result consistency of migration, we also want to migrate this
MPI-implemented code to Spark.

However, during the migration process, I found that there are some possible
limitation with Spark.

In the original MPI implementation, the logic looks like the following:

Node 0( master node )

 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }
Node 1/2/.../P ( slave nodes )
 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }

The essential difficulty for migrating the above logic to Spark is:
In the original implementation, between each iteration, the computation
nodes need to hold the local state( which is g_doc_data and
l_dist_metric_data ).
And in Spark, it looks that there isn't any effective ways for keeping
intermediate local state between iterations. Usually in Spark, we use
either broadcast variable or closure to pass state to the operations of
each iterations.

Of course, after each iteration, we could summarize the change effects from
all the worker nodes via reduce and then broadcast this summarization
effect to them back again. But this operation will involve a significant
data transfer, when the data size is large ( e.g. 100 thousands documents
with 500 dimension feature vectors ),  and the performance penalty is
non-neglectable.

So my question is:
1. Is the difficulty I mentioned above is the limitations imposed by the
computation paradigm of Spark?
2. Is there any possible ways for implementing the bottom-up agglomeration
hierarchical clustering algorithms in Spark?

BTW, I know there are some top-down divisive hierarchical clustering
algorithm in the upcoming 1.2 release, I will also give them a try.

Thanks.
-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro