why groupByKey still shuffle if SQL does "Distribute By" on same columns ?

2018-01-30 Thread Dibyendu Bhattacharya
 Hi,

I am trying something like this..

val sesDS:  Dataset[XXX] = hiveContext.sql(select).as[XXX]

The select statement is something like this : "select * from sometable 
DISTRIBUTE by col1, col2, col3"

Then comes groupByKey...

val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3))

As my select is already Distribute the data based on columns which are same
as what I used in groupByKey, Why does groupByKey  still doing the shuffle
? Is this an issue or I am missing something ?

Regards,
Dibyendu


Re: question on Write Ahead Log (Spark Streaming )

2017-03-10 Thread Dibyendu Bhattacharya
Hi,

You could also use this Receiver :
https://github.com/dibbhatt/kafka-spark-consumer

This is part of spark-packages also :
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You do not need to enable WAL in this and still recover from Driver failure
with no data loss. You can refer to
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for
more details or can reach out to me.

Regards,
Dibyendu


On Wed, Mar 8, 2017 at 8:58 AM, kant kodali  wrote:

> Hi All,
>
> I am using a Receiver based approach. And I understand that spark
> streaming API's will convert the received data from receiver into blocks
> and these blocks that are in memory are also stored in WAL if one enables
> it. my upstream source which is not Kafka can also replay by which I mean
> if I don't send an ack to my upstream source it will resend it so I don't
> have to write the received data to WAL however I still need to enable WAL
> correct? because there are blocks that are in memory that needs to written
> to WAL so they can be recovered later.
>
> Thanks,
> kant
>


Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2017-02-15 Thread Dibyendu Bhattacharya
Hi ,

Released latest version of Receiver based Kafka Consumer for Spark Streaming
.

Available at Spark Packages : https://spark-packages.org/package/dibbhatt/
kafka-spark-consumer

Also at github  : https://github.com/dibbhatt/kafka-spark-consumer

Some key features

   - Tuned for better performance
   - Support for Spark 2.x, Kafka 0.10
   - Support for Consumer Lag Check ( ConsumerOffsetChecker/ Burrow etc)
   - WAL less recovery
   - Better tuned PID Controller having Auto Rate Adjustment with incoming
   traffic
   - Support for Custom Message Interceptors

Please refer to https://github.com/dibbhatt/kafka-spark-consumer/
blob/master/README.md for more details

Regards,
Dibyendu


Re: Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2016-08-25 Thread Dibyendu Bhattacharya
Hi,

This package is not dependant on any spefic Spark release and can be used
with 1.5 . Please refer to "How To" section here :

https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Also you will find more information in readme file how to use this package.

Regards,
Dibyendu


On Thu, Aug 25, 2016 at 7:01 PM, <mdkhajaasm...@gmail.com> wrote:

> Hi Dibyendu,
>
> Looks like it is available in 2.0, we are using older version of spark 1.5
> . Could you please let me know how to use this with older versions.
>
> Thanks,
> Asmath
>
> Sent from my iPhone
>
> On Aug 25, 2016, at 6:33 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
> Hi ,
>
> Released latest version of Receiver based Kafka Consumer for Spark
> Streaming.
>
> Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All
> Spark Versions
>
> Available at Spark Packages : https://spark-packages.org/
> package/dibbhatt/kafka-spark-consumer
>
> Also at github  : https://github.com/dibbhatt/kafka-spark-consumer
>
> Salient Features :
>
>- End to End No Data Loss without Write Ahead Log
>- ZK Based offset management for both consumed and processed offset
>- No dependency on WAL and Checkpoint
>- In-built PID Controller for Rate Limiting and Backpressure management
>- Custom Message Interceptor
>
> Please refer to https://github.com/dibbhatt/kafka-spark-consumer/
> blob/master/README.md for more details
>
>
> Regards,
>
> Dibyendu
>
>
>


Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2016-08-25 Thread Dibyendu Bhattacharya
Hi ,

Released latest version of Receiver based Kafka Consumer for Spark
Streaming.

Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All
Spark Versions

Available at Spark Packages :
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Also at github  : https://github.com/dibbhatt/kafka-spark-consumer

Salient Features :

   - End to End No Data Loss without Write Ahead Log
   - ZK Based offset management for both consumed and processed offset
   - No dependency on WAL and Checkpoint
   - In-built PID Controller for Rate Limiting and Backpressure management
   - Custom Message Interceptor

Please refer to
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for
more details


Regards,

Dibyendu


Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-13 Thread Dibyendu Bhattacharya
You can get some good pointers in this JIRA

https://issues.apache.org/jira/browse/SPARK-15796

Dibyendu


On Thu, Jul 14, 2016 at 12:53 AM, Sunita  wrote:

> I am facing the same issue. Upgrading to Spark1.6 is causing hugh
> performance
> loss. Could you solve this issue? I am also attempting memory settings as
> mentioned
> http://spark.apache.org/docs/latest/configuration.html#memory-management
>
> But its not making a lot of difference. Appreciate your inputs on this
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2016-07-13 Thread Dibyendu Bhattacharya
In Spark Streaming job, I see a Batch failed with following error. Haven't
seen anything like this earlier.

This has happened during Shuffle for one Batch (haven't reoccurred after
that).. Just curious to know what can cause this error. I am running Spark
1.5.1

Regards,
Dibyendu


Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4
times, most recent failure: Lost task 2801.3 in stage 9421.0:
java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -68321)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
at 
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError

2016-01-07 Thread Dibyendu Bhattacharya
Right .. if you are using github version, just modify the ReceiverLauncher
and add that . I will fix it for Spark 1.6 and release new version in
spark-packages for spark 1.6

Dibyendu

On Thu, Jan 7, 2016 at 4:14 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I cloned g...@github.com:dibbhatt/kafka-spark-consumer.git a moment ago.
>
> In ./src/main/java/consumer/kafka/ReceiverLauncher.java , I see:
>jsc.addStreamingListener(new StreamingListener() {
>
> There is no onOutputOperationStarted method implementation.
>
> Looks like it should be added for Spark 1.6.0
>
> Cheers
>
> On Thu, Jan 7, 2016 at 2:39 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> You are using low level spark kafka consumer . I am the author of the
>> same.
>>
>> Are you using the spark-packages version ? if yes which one ?
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Jan 7, 2016 at 4:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> Do you perhaps use custom StreamingListener?
>>> `StreamingListenerBus.scala:47` calls
>>> `StreamingListener.onOutputOperationStarted` that was added in
>>> [SPARK-10900] [STREAMING] Add output operation events to
>>> StreamingListener [1]
>>>
>>> The other guess could be that at runtime you still use Spark < 1.6.
>>>
>>> [1] https://issues.apache.org/jira/browse/SPARK-10900
>>>
>>> Pozdrawiam,
>>> Jacek
>>>
>>> Jacek Laskowski | https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark
>>> ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>>
>>> On Thu, Jan 7, 2016 at 10:59 AM, Walid LEZZAR <walez...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > We have been using spark streaming for a little while now.
>>> >
>>> > Until now, we were running our spark streaming jobs in spark 1.5.1 and
>>> it
>>> > was working well. Yesterday, we upgraded to spark 1.6.0 without any
>>> changes
>>> > in the code. But our streaming jobs are not working any more. We are
>>> getting
>>> > an "AbstractMethodError". Please, find the stack trace at the end of
>>> the
>>> > mail. Can we have some hints on what this error means ? (we are using
>>> spark
>>> > to connect to kafka)
>>> >
>>> > The stack trace :
>>> > 16/01/07 10:44:39 INFO ZkState: Starting curator service
>>> > 16/01/07 10:44:39 INFO CuratorFrameworkImpl: Starting
>>> > 16/01/07 10:44:39 INFO ZooKeeper: Initiating client connection,
>>> > connectString=localhost:2181 sessionTimeout=12
>>> > watcher=org.apache.curator.ConnectionState@2e9fa23a
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Opening socket connection to server
>>> > localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
>>> > (unknown error)
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Socket connection established to
>>> > localhost/127.0.0.1:2181, initiating session
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Session establishment complete on
>>> server
>>> > localhost/127.0.0.1:2181, sessionid = 0x1521b6d262e0035, negotiated
>>> timeout
>>> > = 6
>>> > 16/01/07 10:44:39 INFO ConnectionStateManager: State change: CONNECTED
>>> > 16/01/07 10:44:40 INFO PartitionManager: Read partition information
>>> from:
>>> >
>>> /spark-kafka-consumer/StreamingArchiver/lbc.job.multiposting.input/partition_0
>>> > --> null
>>> > 16/01/07 10:44:40 INFO JobScheduler: Added jobs for time 145215988
>>> ms
>>> > 16/01/07 10:44:40 INFO JobScheduler: Starting job streaming job
>>> > 145215988 ms.0 from job set of time 145215988 ms
>>> > 16/01/07 10:44:40 ERROR Utils: uncaught error in thread
>>> > StreamingListenerBus, stopping SparkContext
>>> >
>>> > ERROR Utils: uncaught error in thread StreamingListenerBus, stopping
>>> > SparkContext
>>> > java.lang.AbstractMethodError
>>> > at
>>> >
>>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:47)
>>> > at
>>> >
>>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26)
>&

Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError

2016-01-07 Thread Dibyendu Bhattacharya
Some discussion is there in https://github.com/dibbhatt/kafka-spark-consumer
and some is mentioned in https://issues.apache.org/jira/browse/SPARK-11045

Let me know if those answer your question .

In short, Direct Stream is good choice if you need exact once semantics and
message ordering , but many use case does not need such requirement of
exact-once and message ordering . If you use Direct Stream the RDD
processing parallelism is limited to Kafka partition and you need to store
offset details to external store as checkpoint location is not reliable if
you modify driver code .

Whereas in Receiver based mode , you need to enable WAL for no data loss .
But Spark Receiver based consumer from KafkaUtils which uses Kafka High
Level API has serious issues , and thus if at all you need to switch to
receiver based mode , this low level consumer is a better choice.

Performance wise I have not published any number yet , but from internal
testing and benchmarking I did ( and validated by folks who uses this
consumer ), it perform much better than any existing consumer in Spark .

Regards,
Dibyendu

On Thu, Jan 7, 2016 at 4:28 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> On Thu, Jan 7, 2016 at 11:39 AM, Dibyendu Bhattacharya
> <dibyendu.bhattach...@gmail.com> wrote:
> > You are using low level spark kafka consumer . I am the author of the
> same.
>
> If I may ask, what are the differences between this and the direct
> version shipped with spark? I've just started toying with it, and
> would appreciate some guidance. Thanks.
>
> Jacek
>


Re: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread Dibyendu Bhattacharya
In direct stream checkpoint location is not recoverable if you modify your
driver code. So if you just rely on checkpoint to commit offset , you can
possibly loose messages if you modify driver code and you select  offset
from "largest" offset. If you do not want to loose messages,  you need to
commit offset to external store in case of direct stream.

On Tue, Dec 8, 2015 at 7:47 PM, PhuDuc Nguyen 
wrote:

> Kafka Receiver-based approach:
> This will maintain the consumer offsets in ZK for you.
>
> Kafka Direct approach:
> You can use checkpointing and that will maintain consumer offsets for you.
> You'll want to checkpoint to a highly available file system like HDFS or S3.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>
> You don't have to maintain your own offsets if you don't want to. If the 2
> solutions above don't satisfy your requirements, then consider writing your
> own; otherwise I would recommend using the supported features in Spark.
>
> HTH,
> Duc
>
>
>
> On Tue, Dec 8, 2015 at 5:05 AM, Tao Li  wrote:
>
>> I am using spark streaming kafka direct approach these days. I found that
>> when I start the application, it always start consumer the latest offset. I
>> hope that when application start, it consume from the offset last
>> application consumes with the same kafka consumer group. It means I have to
>> maintain the consumer offset by my self, for example record it on
>> zookeeper, and reload the last offset from zookeeper when restarting the
>> applicaiton?
>>
>> I see the following discussion:
>> https://github.com/apache/spark/pull/4805
>> https://issues.apache.org/jira/browse/SPARK-6249
>>
>> Is there any conclusion? Do we need to maintain the offset by myself? Or
>> spark streaming will support a feature to simplify the offset maintain work?
>>
>>
>> https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html
>>
>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
This consumer which I mentioned does not silently throw away data. If
offset out of range it start for earliest offset and that is correct way of
recovery from this error.

Dibyendu
On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote:

> Again, just to be clear, silently throwing away data because your system
> isn't working right is not the same as "recover from any Kafka leader
> changes and offset out of ranges issue".
>
>
>
> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi, if you use Receiver based consumer which is available in
>> spark-packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
>> has all built in failure recovery and it can recover from any Kafka leader
>> changes and offset out of ranges issue.
>>
>> Here is the package form github :
>> https://github.com/dibbhatt/kafka-spark-consumer
>>
>>
>> Dibyendu
>>
>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> How to avoid those Errors with receiver based approach? Suppose we are
>>> OK with at least once processing and use receiver based approach which uses
>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>> find leader offsets for
>>> Set([test_stream,5])))be avoided?
>>>
>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> KafkaRDD.scala , handleFetchErr
>>>>
>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> How to look at Option 2(see the following)? Which portion of the code
>>>>> in Spark Kafka Direct to look at to handle this issue specific to our
>>>>> requirements.
>>>>>
>>>>>
>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>> partition And how would it handle the offsets already calculated in the
>>>>> backlog (if there is one)?
>>>>>
>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> If you're consistently getting offset out of range exceptions, it's
>>>>>> probably because messages are getting deleted before you've processed 
>>>>>> them.
>>>>>>
>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>> consume faster, or both.
>>>>>>
>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>> silently,
>>>>>> because you're losing data.
>>>>>>
>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>> the errors
>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>> OffsetOutOfRangeException.
>>>>>>>
>>>>>>> What are the options we have other than fixing Kafka? We would like
>>>>>>> to do
>>>>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>>>>> Kafka
>>>>>>> Direct?
>>>>>>>
>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>> after the
>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>
>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>> the
>>>>>>> backlog (if there is one)?
>>>>>>>
>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>> offsets.
>>>>>>>
>>>>>>> 4.Or a straightforward approach would be to monit

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
Well, even if you do correct retention and increase speed, OffsetOutOfRange
can still come depends on how your downstream processing is. And if that
happen , there is No Other way to recover old messages . So best bet here
from Streaming Job point of view  is to start from earliest offset rather
bring down the streaming job . In many cases goal for a streaming job is
not to shut down and exit in case of any failure. I believe that is what
differentiate a always running streaming job.

Dibyendu

On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, silently restarting from the earliest offset in the case of offset out
> of range exceptions during a streaming job is not the "correct way of
> recovery".
>
> If you do that, your users will be losing data without knowing why.  It's
> more like  a "way of ignoring the problem without actually addressing it".
>
> The only really correct way to deal with that situation is to recognize
> why it's happening, and either increase your Kafka retention or increase
> the speed at which you are consuming.
>
> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> This consumer which I mentioned does not silently throw away data. If
>> offset out of range it start for earliest offset and that is correct way of
>> recovery from this error.
>>
>> Dibyendu
>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Again, just to be clear, silently throwing away data because your system
>>> isn't working right is not the same as "recover from any Kafka leader
>>> changes and offset out of ranges issue".
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi, if you use Receiver based consumer which is available in
>>>> spark-packages (
>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>>> this has all built in failure recovery and it can recover from any Kafka
>>>> leader changes and offset out of ranges issue.
>>>>
>>>> Here is the package form github :
>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>
>>>>
>>>> Dibyendu
>>>>
>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> How to avoid those Errors with receiver based approach? Suppose we are
>>>>> OK with at least once processing and use receiver based approach which 
>>>>> uses
>>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>>> find leader offsets for
>>>>> Set([test_stream,5])))be avoided?
>>>>>
>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific to 
>>>>>>> our
>>>>>>> requirements.
>>>>>>>
>>>>>>>
>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>> the
>>>>>>> backlog (if there is one)?
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If you're consistently getting offset out of range exceptions, it's
>>>>>>>> probably because messages are getting deleted before you've processed 
>>>>>>>> them.
>>>>>>>>
>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>> consume faster, or both.
>>>>>>>>
>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Dibyendu Bhattacharya
Hi, if you use Receiver based consumer which is available in spark-packages
( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
has all built in failure recovery and it can recover from any Kafka leader
changes and offset out of ranges issue.

Here is the package form github :
https://github.com/dibbhatt/kafka-spark-consumer


Dibyendu

On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy 
wrote:

> How to avoid those Errors with receiver based approach? Suppose we are OK
> with at least once processing and use receiver based approach which uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
> leader offsets for
> Set([test_stream,5])))be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the code in
>>> Spark Kafka Direct to look at to handle this issue specific to our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>> wrote:
>>>
 If you're consistently getting offset out of range exceptions, it's
 probably because messages are getting deleted before you've processed them.

 The only real way to deal with this is give kafka more retention,
 consume faster, or both.

 If you're just looking for a quick "fix" for an infrequent issue,
 option 4 is probably easiest.  I wouldn't do that automatically / silently,
 because you're losing data.

 On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see the
> errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would like to
> do
> something like the following. How can we achieve 1 and 2 with Spark
> Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available
> after the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the
> offsets.
>
> 4.Or a straightforward approach would be to monitor the log for this
> error,
> and if it occurs more than X times, kill the job, remove the checkpoint
> directory, and restart.
>
> ERROR DirectKafkaInputDStream:
> ArrayBuffer(kafka.common.UnknownException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([test_stream,5]))
>
>
>
> java.lang.ClassNotFoundException:
> kafka.common.NotLeaderForPartitionException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> java.util.concurrent.RejectedExecutionException: Task
>
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
> [Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 12112]
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 10
> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
> stage
> 52.0 (TID 255, 172.16.97.97): UnknownReason
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
> Caused by: java.lang.InterruptedException
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 7 in
> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
> 33.0
> (TID 283, 172.16.97.103): UnknownReason
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> --
> View this message in context:
> 

Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Dibyendu Bhattacharya
If you do not need one to one semantics and does not want strict ordering
guarantee , you can very well use the Receiver based approach, and this
consumer from Spark-Packages (
https://github.com/dibbhatt/kafka-spark-consumer) can give much better
alternatives in terms of performance and reliability  for Receiver based
approach.

Regards,
Dibyendu

On Thu, Oct 29, 2015 at 11:57 AM, varun sharma 
wrote:

> Right now, there is one to one correspondence between kafka partitions and
> spark partitions.
> I dont have a requirement of one to one semantics.
> I need more tasks to be generated in the job so that it can be
> parallelised and batch can be completed fast. In the previous Receiver
> based approach number of tasks created were independent of kafka
> partitions, I need something like that only.
> Any config available if I dont need one to one semantics?
> Is there any way I can repartition without incurring any additional cost.
>
> Thanks
> *VARUN SHARMA*
>
>


Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-24 Thread Dibyendu Bhattacharya
Hi,

I have raised a JIRA ( https://issues.apache.org/jira/browse/SPARK-11045)
to track the discussion but also mailing user group .

This Kafka consumer is around for a while in spark-packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer ) and I see
many started using it , I am now thinking of contributing back to Apache
Spark core project so that it can get better support ,visibility and
adoption.

Few Point about this consumer

*Why this is needed :*

This Consumer is NOT the replacement for existing DirectStream API.
DirectStream solves the problem around "Exactly Once" semantics and "Global
Ordering" of messages . But to achieve this DirectStream comes with an
overhead. The overhead of maintaining the offset externally
, limited parallelism while processing the RDD ( as the RDD partition is
same as Kafka Partition ), and higher latency while processing RDD ( as
messages are fetched when RDD is processed) . There are many who does not
want "Exact Once" and "Global Ordering" of messages, or ordering are
managed in external store ( say HBase),  and want more parallelism and
lower latency in their Streaming channel . At this point Spark does not
have a better fallback option available in terms of Receiver Based API.
Present Receiver Based API use Kafka High Level API which is low
performance and has serious issue. [For this reason Kafka is coming up with
new High Level Consumer API in 0.9]

The Consumer which I implemented is using the Kafka Low Level API which
gives more performance.  This consumer has built in fault tolerant features
for all failures recovery. This Consumer extended the code from Storm Kafka
Spout which is being around for some time and has matured over the years
and has all built in Kafka fault tolerant capabilities. This same Kafka
consumer for spark is being running in various production scenarios
presently and already being adopted by many in the spark community.

*Why Can't we fix existing Receiver based API in Spark* :

This is not possible unless you move to Kafka Low Level API . Or let wait
for Kafka 0.9 where they are re-writing the HighLevel Consumer API and
built another kafka spark consumer for Kafka 0.9 customers .
This approach seems to be not good in my opinion. The Kafka Low Level API
which I used in my consumer ( and also DirectStream uses ) will not going
to be deprecated in near future. So if Kafka Consumer for Spark is using
Low Level API for Receiver based mode, that will make sure all Kafka
Customers who are presently in 0.8.x or who will use 0.9 , benefited form
this same API.

*Concerns around Low Level API Complexity*

Yes, implementing a reliable consumer using Kafka Low Level consumer API is
complex. But same has been done for Strom -Kafka Spout and has been stable
for quite some time. This consumer for Spark is battle tested in various
production loads and gives much better performance than existing Kafka
Consumers for Spark and has better fault tolerant approach than existing
Receiver based mode.

*Why can't this consumer continue to be in Spark-Package ?*

This can be possible. But what I see , many customer who want to fallback
to receiver based mode as they may not need "Exact Once" semantics or
"Global Ordering" , seems to little tentative using a spark-package library
for their critical streaming pipeline. And they are forced to use faulty
and buggy Kafka High Level API based mode. This consumer being part of
Spark project will give much higher adoption and support from community.

*Some Major features around this consumer :*

This consumer is controlling the rate limit by maintaining the constant
Block size where as default rate limiting in other Spark consumers are done
by number of messages. This is an issue when Kafka has messages of
different sizes and there is no deterministic way to know the actual block
sizes and memory utilization if rate control done by number of messages.

This consumer has in-built PID controller which controls the Rate of
consumption again by modifying the block size and consume only that much
amount of messages needed from Kafka . In default Spark consumer , it
fetches chunk of messages and then apply throttle to control the rate.
Which can lead to excess I/O while consuming from Kafka.

You can also refer to Readme file for more details  :
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md

If you are using this consumer or going to use it, you can Vote for this
Jira.

Regards,
Dibyendu


Re: Spark Streaming over YARN

2015-10-04 Thread Dibyendu Bhattacharya
How many partitions are there in your Kafka topic ?

Regards,
Dibyendu

On Sun, Oct 4, 2015 at 8:19 PM, <nib...@free.fr> wrote:

> Hello,
> I am using  https://github.com/dibbhatt/kafka-spark-consumer
> I specify 4 receivers in the ReceiverLauncher , but in YARN console I can
> see one node receiving the kafka flow.
> (I use spark 1.3.1)
>
> Tks
> Nicolas
>
>
> ----- Mail original -
> De: "Dibyendu Bhattacharya" <dibyendu.bhattach...@gmail.com>
> À: nib...@free.fr
> Cc: "Cody Koeninger" <c...@koeninger.org>, "user" <user@spark.apache.org>
> Envoyé: Vendredi 2 Octobre 2015 18:21:35
> Objet: Re: Spark Streaming over YARN
>
>
> If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
> messages from each partitions are received by a dedicated receiver. so your
> receiving parallelism is defined by your number of partitions of your topic
> . Every receiver task will be scheduled evenly among nodes in your cluster.
> There was a JIRA fixed in spark 1.5 which does even distribution of
> receivers.
>
>
>
>
>
> Now for RDD parallelism ( i.e parallelism while processing your RDD ) is
> controlled by your Block Interval and Batch Interval.
>
>
> If your block Interval is 200 Ms, there will be 5 blocks per second. If
> your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
> Batch is one RDD , thus your RDD will be 15 partition , which will be
> honored during processing the RDD ..
>
>
>
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote:
>
>
> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com >
> À: nib...@free.fr
> Cc: "Cody Koeninger" < c...@koeninger.org >, "user" <
> user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
>
>
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
Hi,

If you need to use Receiver based approach , you can try this one :
https://github.com/dibbhatt/kafka-spark-consumer

This is also part of Spark packages :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You just need to specify the number of Receivers you want for desired
parallelism while receiving , and rest of the thing will be taken care by
ReceiverLauncher.

This Low level Receiver  will give better parallelism both on receiving ,
and on processing the RDD.

Default Receiver based API ( KafkaUtils.createStream) using Kafka High
level API and Kafka high Level API has serious issue to be used in
production .


Regards,
Dibyendu





On Fri, Oct 2, 2015 at 9:22 PM,  wrote:

> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" 
> À: "Nicolas Biau" 
> Cc: "user" 
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
messages from each partitions are received by a dedicated receiver. so your
receiving parallelism is defined by your number of partitions of your topic
.  Every receiver task will be scheduled evenly among nodes in your
cluster. There was a JIRA fixed in spark 1.5 which does even distribution
of receivers.


Now for RDD parallelism ( i.e parallelism while processing your RDD )  is
controlled by your Block Interval and Batch Interval.

If your block Interval is 200 Ms, there will be 5 blocks per second. If
your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
Batch is one RDD , thus your RDD will be 15 partition , which will be
honored during processing the RDD ..


Regards,
Dibyendu


On Fri, Oct 2, 2015 at 9:40 PM, <nib...@free.fr> wrote:

> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" <dibyendu.bhattach...@gmail.com>
> À: nib...@free.fr
> Cc: "Cody Koeninger" <c...@koeninger.org>, "user" <user@spark.apache.org>
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread Dibyendu Bhattacharya
Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
.set("spark.externalBlockStore.baseDir", "/sparkstreaming")
.set("spark.externalBlockStore.folderName","pearson")
.set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
1));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Just to follow up this thread further .
>>>>
>>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>>>> the BlockNotFound exception when I used Hierarchical Storage of
>>>> Tachyon ,  which is good.
>>>>
>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>
>>>>
>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>> failure.
>>>>
>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>  for the same Receiver id which just failed.
>>>>
>>>> If I change the Checkpoint locati

Re: Managing scheduling delay in Spark Streaming

2015-09-16 Thread Dibyendu Bhattacharya
Hi Michal,

If you use https://github.com/dibbhatt/kafka-spark-consumer  , it comes
with int own built-in back pressure mechanism. By default this is disabled,
you need to enable it to use this feature with this consumer. It does
control the rate based on Scheduling Delay at runtime..

Regards,
Dibyendu

On Wed, Sep 16, 2015 at 12:32 PM, Akhil Das 
wrote:

> I had a workaround for exactly the same scenario
> http://apache-spark-developers-list.1001551.n3.nabble.com/SparkStreaming-Workaround-for-BlockNotFound-Exceptions-td12096.html
>
> Apart from that, if you are using this consumer
> https://github.com/dibbhatt/kafka-spark-consumer it also has a built-in
> rate limiting, Also in Spark 1.5.0 they have a rate limiting/back-pressure
> (haven't tested it on production though).
>
>
>
> Thanks
> Best Regards
>
> On Tue, Sep 15, 2015 at 11:56 PM, Michal Čizmazia 
> wrote:
>
>> Hi,
>>
>> I have a Reliable Custom Receiver storing messages into Spark. Is there
>> way how to prevent my receiver from storing more messages into Spark when
>> the Scheduling Delay reaches a certain threshold?
>>
>> Possible approaches:
>> #1 Does Spark block on the Receiver.store(messages) call to prevent
>> storing more messages and overflowing the system?
>> #2 How to obtain the Scheduling Delay in the Custom Receiver, so that I
>> can implement the feature.
>>
>> Thanks,
>>
>> Mike
>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

This is being running in Production in many organization who has adopted
this consumer as an alternative option.  The Consumer will run with spark
1.3.1 .

This is being running in Pearson for sometime in production.

This is part of spark packages and you can see how to include it in your
mvn or sbt .

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

As this consumer comes with in-built PID controller to control
back-pressure which you can use even if you are using Spark 1.3.1


Regards,
Dibyendu


On Thu, Sep 10, 2015 at 5:48 PM, Krzysztof Zarzycki 
wrote:

> Thanks Akhil, seems like an interesting option to consider.
> Do you know if the package is production-ready? Do you use it in
> production?
>
> And do you know if it works for Spark 1.3.1 as well? README mentions that
> package in spark-packages.org is built with Spark 1.4.1.
>
>
> Anyway, it  seems that core Spark Streaming does not support my case? Or
> anyone can instruct me on how to do it? Let's say, that I'm even fine (but
> not content about) with using KafkaCluster private class that is included
> in Spark, for manual managing ZK offsets. Has someone done it before? Has
> someone public code examples of manually managing ZK offsets?
>
> Thanks,
> Krzysztof
>
> 2015-09-10 12:22 GMT+02:00 Akhil Das :
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

Just to clarify one point which may not be clear to many. If someone
 decides to use Receiver based approach , the best options at this point is
to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
work with WAL like any other receiver based consumer. The major issue with
KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
Low Level Kafka Consumer API which does not have any such issue.  I am not
sure if there is any publicly available performance benchmark done with
this one with the DirectStream, so can not comment on performance benefits
of one over other , but whatever performance benchmark we have done,
dibbhatt/kafka-spark-consumer  stands out..

Regards,
Dibyendu

On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger  wrote:

> You have to store offsets somewhere.
>
> If you're going to store them in checkpoints, then you have to deal with
> the fact that checkpoints aren't recoverable on code change.  Starting up
> the new version helps because you don't start it from the same checkpoint
> directory as the running one... it has your new code, and is storing to a
> new checkpoint directory.  If you started the new one from the latest
> offsets, you can shut down the old one as soon as it's caught up.
>
> If you don't like the implications of storing offsets in checkpoints...
> then sure, store them yourself.  A real database would be better, but if
> you really want to store them in zookeeper you can.  In any case, just do
> your offset saves in the same foreachPartition your other output operations
> are occurring in, after they've successfully completed.
>
> If you don't care about performance benefits of the direct stream and
> don't want exactly once semantics, sure use the old stream.
>
> Finally, hundreds of gigs just really isn't very much data.  Unless what
> you're doing is really resource intensive, it shouldn't take much time to
> process it all, especially if you can dynamically size a cluster for the
> rare occasion that something is screwed up and you need to reprocess.
>
>
> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
> wrote:
>
>> Thanks guys for your answers. I put my answers in text, below.
>>
>> Cheers,
>> Krzysztof Zarzycki
>>
>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>
>>> The kafka direct stream meets those requirements.  You don't need
>>> checkpointing for exactly-once.  Indeed, unless your output operations are
>>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>>> Instead, you need to store the offsets atomically in the same transaction
>>> as your results.
>>>
>>
>> To focus discussion, let's assume my operations are idempotent & I'm
>> interested in at-least-once thanks to that (which is idempotent
>> exactly-once as named in your pres). Did you say, that I don't need
>> checkpointing for that? How then direct stream API would store offsets
>>  between restarts?
>>
>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>> and the video / blog posts linked from it.
>>>
>>>
>> I did that, thank you. What I want is to achieve "idempotent
>> exactly-once" as named in your presentation.
>>
>>
>>> The dibhatt consumer that Akhil linked is using zookeeper to store
>>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>>> idempotent output operations.
>>>
>> True, and I totally accept it if what I get is at-least-once.
>>
>>
>>>
>>>
>> Regarding the issues around code changes and checkpointing, the most
>>> straightforward way to deal with this is to just start a new version of
>>> your job before stopping the old one.  If you care about delivery semantics
>>> and are using checkpointing, your output operation must be idempotent
>>> anyway, so having 2 versions of the code running at the same time for a
>>> brief period should not be a problem.
>>>
>>
>> How starting new version before stopping old one helps? Could you please
>> explain a bit the mechanics of that?
>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>> situations when it will be just inapropriate to run old one, when, let's
>> say, we discovered a bug and don't want to run it anymore.
>>
>>
>> So... To sum up it correctly, if I want at-least-once, with simple code
>> upgrades,  I need to:
>> -  store offsets in external storage (I would choose ZK for that).
>> -  read them on application restart and pass the
>> TopicAndPartition->offset map to createDirectStream.
>> -  And I don't need to use checkpoints at all then.
>> Could you confirm that?
>>
>> It's a question where should I actually commit the ZK offsets. The
>> easiest would be to do it on the end of every batch. Do you think I can use
>> org.apache.spark.streaming.scheduler.StreamingListener, method
>> onBatchCompleted for that? I don't think so, because probably we don't have
>> 

Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
Sometime back I was playing with Spark and Tachyon and I also found this
issue .  The issue here is TachyonBlockManager put the blocks in
WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
from Tachyon Cache when Memory is full and when Spark try to find the block
it throws  BlockNotFoundException .

To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
worked and I did not see any any Spark Job failed due to
BlockNotFoundException.
below is my  Hierarchical Storage settings which I used..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards,
Dibyendu

On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote:


 I am using tachyon in the spark program below,but I encounter a
 BlockNotFoundxception.
 Does someone know what's wrong and also is there guide on how to configure
 spark to work with Tackyon?Thanks!

 conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998
 )
 conf.set(spark.externalBlockStore.baseDir,/spark)
 val sc = new SparkContext(conf)
 import org.apache.spark.storage.StorageLevel
 val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
 rdd.persist(StorageLevel.OFF_HEAP)
 val count = rdd.count()
val sum = rdd.reduce(_ + _)
 println(sThe count: $count, The sum is: $sum)


 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
 have all completed, from pool
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 0.0 (TID 5, localhost): java.lang.RuntimeException:
 org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at
 

Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
The URL seems to have changed .. here is the one ..
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html



On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Sometime back I was playing with Spark and Tachyon and I also found this
 issue .  The issue here is TachyonBlockManager put the blocks in
 WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
 from Tachyon Cache when Memory is full and when Spark try to find the
 block it throws  BlockNotFoundException .

 To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
 -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
 worked and I did not see any any Spark Job failed due to 
 BlockNotFoundException.
 below is my  Hierarchical Storage settings which I used..

   -Dtachyon.worker.hierarchystore.level.max=2
   -Dtachyon.worker.hierarchystore.level0.alias=MEM
   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

 -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
   -Dtachyon.worker.hierarchystore.level1.alias=HDD
   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
   -Dtachyon.worker.allocate.strategy=MAX_FREE
   -Dtachyon.worker.evict.strategy=LRU

 Regards,
 Dibyendu

 On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote:


 I am using tachyon in the spark program below,but I encounter a
 BlockNotFoundxception.
 Does someone know what's wrong and also is there guide on how to
 configure spark to work with Tackyon?Thanks!

 conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998
 )
 conf.set(spark.externalBlockStore.baseDir,/spark)
 val sc = new SparkContext(conf)
 import org.apache.spark.storage.StorageLevel
 val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
 rdd.persist(StorageLevel.OFF_HEAP)
 val count = rdd.count()
val sum = rdd.reduce(_ + _)
 println(sThe count: $count, The sum is: $sum)


 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
 tasks have all completed, from pool
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 0.0 (TID 5, localhost): java.lang.RuntimeException:
 org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333

Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller

2015-08-26 Thread Dibyendu Bhattacharya
Dear All,

Just now released the 1.0.4 version of Low Level Receiver based
Kafka-Spark-Consumer in spark-packages.org .  You can find the latest
release here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Here is github location : https://github.com/dibbhatt/kafka-spark-consumer

This consumer is now have built in PID ( Proportional , Integral,
Derivative ) Rate controller to control the Spark Back-Pressure .

This consumer implemented the Rate Limiting logic not by controlling the
number of messages per block ( as it is done in Spark's Out of Box
Consumers), but by size of the blocks per batch. i.e. for any given batch,
this consumer controls the Rate limit by controlling the size of the
batches. As Spark memory is driven by block size rather the number of
messages , I think rate limit by block size is more appropriate. e.g. Let
assume Kafka contains messages of very small sizes ( say few hundred bytes
) to larger messages ( to few hundred KB ) for same topic. Now if we
control the rate limit by number of messages, Block sizes may vary
drastically based on what type of messages get pulled per block . Whereas ,
if I control my rate limiting by size of block, my block size remain
constant across batches (even though number of messages differ across
blocks ) and can help to tune my memory settings more correctly as I know
how much exact memory my Block is going to consume.


This Consumer has its own PID (Proportional, Integral, Derivative )
Controller built into the consumer and control the Spark Back Pressure by
modifying the size of Block it can consume at run time. The PID Controller
rate feedback mechanism is built using Zookeeper. Again the logic to
control Back Pressure is not by controlling number of messages ( as it is
done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per
batch from Kafka. As the Back Pressure is built into the Consumer, this
consumer can be used with any version of Spark if anyone want to have a
back pressure controlling mechanism in their existing Spark / Kafka
environment.

Regards,
Dibyendu


Re: spark streaming 1.3 kafka error

2015-08-22 Thread Dibyendu Bhattacharya
I think you also can give a try to this consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
environment. This has been running fine for topic with large number of
Kafka partition (  200 ) like yours without any issue.. no issue with
connection as this consumer re-use kafka connection , and also can recover
from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


Regards,
Dibyendu

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 On trying the consumer without external connections  or with low number of
 external conections its working fine -

 so doubt is how  socket got closed -

 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.



 On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you try some other consumer and see if the issue still exists?
 On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only- server
 side there is no issue.


 Want to understand is executor(simple consumer) not making new
 connection to kafka broker at start of each task ? Or is it created once
 only and that is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the 
 issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at 
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at 
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at 
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
 at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
 at 
 

Re: Reliable Streaming Receiver

2015-08-05 Thread Dibyendu Bhattacharya
Hi,

You can try This Kafka Consumer for Spark which is also part of Spark
Packages . https://github.com/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Thu, Aug 6, 2015 at 6:48 AM, Sourabh Chandak sourabh3...@gmail.com
wrote:

 Thanks Tathagata. I tried that but BlockGenerator internally uses
 SystemClock which is again private.

 We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
 version. Is it possible to use the same code as a separate API with 1.2?

 Thanks,
 Sourabh

 On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote:

  You could very easily strip out the BlockGenerator code from the Spark
 source code and use it directly in the same way the Reliable Kafka Receiver
 uses it. BTW, you should know that we will be deprecating the receiver
 based approach for the Direct Kafka approach. That is quite flexible, can
 give exactly-once guarantee without WAL, and is more robust and performant.
 Consider using it.


 On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com
 wrote:

 Hi,

 I am trying to replicate the Kafka Streaming Receiver for a custom
 version of Kafka and want to create a Reliable receiver. The current
 implementation uses BlockGenerator which is a private class inside Spark
 streaming hence I can't use that in my code. Can someone help me with some
 resources to tackle this issue?



 Thanks,
 Sourabh






Some BlockManager Doubts

2015-07-09 Thread Dibyendu Bhattacharya
Hi ,

Just would like to clarify few doubts I have how BlockManager behaves .
This is mostly in regards to Spark Streaming Context .

There are two possible cases Blocks may get dropped / not stored in memory

Case 1. While writing the Block for MEMORY_ONLY settings , if Node's
BlockManager does not have enough memory to unroll the block , Block wont
be stored to memory and Receiver will throw error while writing the Block..
If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will
be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This
is fine in the case while receiving the blocks , but this logic has a issue
when old Blocks are chosen to be dropped from memory as Case 2

Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings ,
blocks are successfully stored to Memory in Case 1 . Now what would happen
if memory limit goes beyond a certain threshold, BlockManager start
dropping LRU blocks from memory which was successfully stored while
receiving.

Primary issue here what I see , while dropping the blocks in Case 2 , Spark
does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even
with DISK storage levels  blocks is drooped from memory without writing it
to Disk.
Or I believe the issue is at the first place that blocks are NOT written to
Disk simultaneously in Case 1 , I understand this will impact throughput ,
but it design may throw BlockNotFound error if Blocks are chosen to be
dropped even in case of StorageLevel is using Disk.

Any thoughts ?

Regards,
Dibyendu


Re: spark streaming with kafka reset offset

2015-06-27 Thread Dibyendu Bhattacharya
Hi,

There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.

This is also Reliable Receiver and Commit offset to ZK.  Given the number
of Kafka Partitions you have (  100) , using High Level Kafka API for
Receiver based approach may leads to issues related Consumer Re-balancing
 which is a major issue of Kafka High Level API.

Regards,
Dibyendu



On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das t...@databricks.com wrote:

 In the receiver based approach, If the receiver crashes for any reason
 (receiver crashed or executor crashed) the receiver should get restarted on
 another executor and should start reading data from the offset present in
 the zookeeper. There is some chance of data loss which can alleviated using
 Write Ahead Logs (see streaming programming guide for more details, or see
 my talk [Slides PDF
 http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx
 , Video
 https://www.youtube.com/watch?v=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4
 ] from last Spark Summit 2015). But that approach can give duplicate
 records. The direct approach gives exactly-once guarantees, so you should
 try it out.

 TD

 On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Read the spark streaming guide ad the kafka integration guide for a
 better understanding of how the receiver based stream works.

 Capacity planning is specific to your environment and what the job is
 actually doing, youll need to determine it empirically.


 On Friday, June 26, 2015, Shushant Arora shushantaror...@gmail.com
 wrote:

 In 1.2 how to handle offset management after stream application starts
 in each job . I should commit offset after job completion manually?

 And what is recommended no of consumer threads. Say I have 300
 partitions in kafka cluster . Load is ~ 1 million events per second.Each
 event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
 is sufficient for spark streaming to consume ?

 On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The receiver-based kafka createStream in spark 1.2 uses zookeeper to
 store offsets.  If you want finer-grained control over offsets, you can
 update the values in zookeeper yourself before starting the job.

 createDirectStream in spark 1.3 is still marked as experimental, and
 subject to change.  That being said, it works better for me in production
 than the receiver based api.

 On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 I am using spark streaming 1.2.

 If processing executors get crashed will receiver rest the offset back
 to last processed offset?

 If receiver itself got crashed is there a way to reset the offset
 without restarting streaming application other than smallest or largest.


 Is spark streaming 1.3  which uses low level consumer api, stabe? And
 which is recommended for handling data  loss 1.2 or 1.3 .












Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message

2015-06-10 Thread Dibyendu Bhattacharya
Hi,

Can you please little detail stack trace from your receiver logs and also
the consumer settings you used ? I have never tested the consumer with
Kafka 0.7.3 ..not sure if Kafka Version is the issue . Have you tried
building the consumer using Kafka 0.7.3 ?

Regards,
Dibyendu

On Wed, Jun 10, 2015 at 11:52 AM, karma243 ashut...@reducedata.com wrote:

 Thank you for responding @nsalian.

 1. I am trying to replicate  this
 https://github.com/dibbhatt/kafka-spark-consumer   project on my local
 system.

 2. Yes, kafka and brokers on the same host.

 3. I am working with kafka 0.7.3 and spark 1.3.1. Kafka 0.7.3 does not has
 --describe command. Though I've worked on three cases (Kafka and
 Zookeeper
 were on my machine all the time):
   (i) Producer-Consumer on my machine.
   (ii) Producer on my machine and Consumer on different machine.
   (iii) Consumer on my machine and producer on different machine.

 All the three cases were working properly.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23240.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out

2015-06-08 Thread Dibyendu Bhattacharya
Hi Snehal

Are you running the latest kafka consumer from github/spark-packages ? If
not can you take the latest changes. This low level receiver will make
attempt to keep trying if underlying BlockManager gives error. Are you see
those retry cycle in log ? If yes then there is issue writing blocks to
blockmanager and spark not able to recover from this failure but Receivet
keep trying ..

Which version of Spark you are using ?

Dibyendu
On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com wrote:

 All,

 I am using Kafka Spark Consumer
 https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming job
 .

 After spark streaming job runs for few hours , all executors exit and I
 still see status of application on SPARK UI as running

 Does anyone know cause of this exception and how to fix this ?


  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - 
 Error reported by receiver for stream 7: Error While Store for Partition 
 Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - 
 org.apache.spark.SparkException: Error sending message [message = 
 UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 
 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 
 1),10492,0,0)]
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
   at 
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
   at 
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
   at 
 org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
   at 
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
   at 
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
   at 
 org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
   at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
   at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
   at 
 org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
   at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
   at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
   at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
   at java.lang.Thread.run(Thread.java:745)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 
 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

   ... 14 more WARN  
 [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error 
 sending message [message = UpdateBlockInfo(BlockManagerId(driver, 
 dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, 
 false, false, 1),0,0,0)] in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
   at 
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
   at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
   at 
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
   at 
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
   at 
 org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
   at 
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
   at 
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104)
   at 
 org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
   at 
 

Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out

2015-06-08 Thread Dibyendu Bhattacharya
Seems to be related to this JIRA :
https://issues.apache.org/jira/browse/SPARK-3612 ?



On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Snehal

 Are you running the latest kafka consumer from github/spark-packages ? If
 not can you take the latest changes. This low level receiver will make
 attempt to keep trying if underlying BlockManager gives error. Are you see
 those retry cycle in log ? If yes then there is issue writing blocks to
 blockmanager and spark not able to recover from this failure but Receivet
 keep trying ..

 Which version of Spark you are using ?

 Dibyendu
 On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com wrote:

 All,

 I am using Kafka Spark Consumer
 https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming
 job .

 After spark streaming job runs for few hours , all executors exit and I
 still see status of application on SPARK UI as running

 Does anyone know cause of this exception and how to fix this ?


  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - 
 Error reported by receiver for stream 7: Error While Store for Partition 
 Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - 
 org.apache.spark.SparkException: Error sending message [message = 
 UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 
 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 
 1),10492,0,0)]
  at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
  at 
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
  at 
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
  at 
 org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
  at 
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
  at 
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at 
 org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
  at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
  at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
  at 
 org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
  at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
  at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
  at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
  at java.lang.Thread.run(Thread.java:745)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
 [30 seconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)
  at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

  ... 14 more WARN  
 [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error 
 sending message [message = UpdateBlockInfo(BlockManagerId(driver, 
 dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, 
 false, false, 1),0,0,0)] in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
  at 
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
  at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
  at scala.concurrent.Await$.result(package.scala:107)
  at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
  at 
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
  at 
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
  at 
 org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
  at 
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
  at 
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104)
  at 
 org.apache.spark.storage.BlockManager

Re: Spark Streaming and Drools

2015-05-22 Thread Dibyendu Bhattacharya
Hi,

Sometime back I played with Distributed Rule processing by integrating
Drool with HBase Co-Processors ..and invoke Rules on any incoming data ..

https://github.com/dibbhatt/hbase-rule-engine

You can get some idea how to use Drools rules if you see this
RegionObserverCoprocessor ..

https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java


Idea is basically to create a stateless Ruleengine from the drl file and
fire the rule on incoming data ..

Even though the code is for invoking rules on HBase PUT object , but you
can get an idea ..and modify it for Spark..

Dibyendu



On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 I am not aware of existing examples but you can always “ask” Google



 Basically from Spark Streaming perspective, Drools is a third-party
 Software Library, you would invoke it in the same way as any other
 third-party software library from the Tasks (maps, filters etc) within your
 DAG job



 *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
 *Sent:* Friday, May 22, 2015 11:07 AM
 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming and Drools



 Thanks a lot Evo,

 do you know where I can find some examples?

 Have a great one


 A G



 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

 You can deploy and invoke Drools as a Singleton on every Spark Worker Node
 / Executor / Worker JVM



 You can invoke it from e.g. map, filter etc and use the result from the
 Rule to make decision how to transform/filter an event/message



 *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
 *Sent:* Friday, May 22, 2015 9:43 AM
 *To:* user@spark.apache.org
 *Subject:* Spark Streaming and Drools



 Hi All,

 I'm deploying and architecture that uses flume for sending log information
 in a sink.

 Spark streaming read from this sink (pull strategy) e process al this
 information, during this process I would like to make some event
 processing. . . for example:

 Log appender writes information about all transactions in my trading
 platforms,

 if a platform user sells more than buy during a week I need to receive an
 alert on an event dashboard.

 How can I realize it? Is it possible with drools?

 Thanks so much





Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Dibyendu Bhattacharya
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote:

 Looks like somehow the file size reported by the FSInputDStream of
 Tachyon's FileSystem interface, is returning zero.

 On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of Tachyon
 ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised
 a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
 not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-20 Thread Dibyendu Bhattacharya
Thanks Tathagata for making this change..

Dibyendu

On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com wrote:

 If you are talking about handling driver crash failures, then all bets are
 off anyways! Adding a shutdown hook in the hope of handling driver process
 failure, handles only a some cases (Ctrl-C), but does not handle cases like
 SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its
 not a good idea to rely on that.

 Nonetheless I have opened a PR to handle the shutdown of the
 StreamigntContext in the same way as SparkContext.
 https://github.com/apache/spark/pull/6307


 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thenka Sean . you are right. If driver program is running then I can
 handle shutdown in main exit path  . But if Driver machine is crashed (if
 you just stop the application, for example killing the driver process ),
 then Shutdownhook is the only option isn't it ? What I try to say is , just
 doing ssc.stop in  sys.ShutdownHookThread  or
  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
 to use the Utils.addShutdownHook with a priority .. So just checking if
 Spark Streaming can make graceful shutdown as default shutdown mechanism.

 Dibyendu

 On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the
 Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works .
 In
  that case shutdown-hook run in priority order.
 






Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
By the way this happens when I stooped the Driver process ...

On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 You mean to say within Runtime.getRuntime().addShutdownHook I call
 ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?

 This won't work anymore in 1.4.

 The SparkContext got stopped before Receiver processed all received blocks
 and I see below exception in logs. But if I add the Utils.addShutdownHook
 with the priority as I mentioned , then only graceful shutdown works . In
 that case shutdown-hook run in priority order.



 *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
 signal to all 3 receivers*
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 0: Stopped by driver
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 1: Stopped by driver
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 2: Stopped by driver
 *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/batch,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/metrics/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/static,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/threadDump,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/environment/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/environment,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/rdd,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/pool/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/pool,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/job/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/job,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs,null}
 INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at
 http://10.252.5.113:4040
 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler
 INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at
 Consumer.java:122, took 10.398746 s
 *Exception in thread Thread-28 org.apache.spark.SparkException: Job
 cancelled because SparkContext was shut down

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
Thenka Sean . you are right. If driver program is running then I can handle
shutdown in main exit path  . But if Driver machine is crashed (if you just
stop the application, for example killing the driver process ), then
Shutdownhook is the only option isn't it ? What I try to say is , just
doing ssc.stop in  sys.ShutdownHookThread  or
 Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
to use the Utils.addShutdownHook with a priority .. So just checking if
Spark Streaming can make graceful shutdown as default shutdown mechanism.

Dibyendu

On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works . In
  that case shutdown-hook run in priority order.
 



Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
(ReceiverTracker.scala:105)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread main java.lang.IllegalStateException: Shutdown in
progress






On Tue, May 19, 2015 at 11:58 AM, Tathagata Das t...@databricks.com wrote:

 If you wanted to stop it gracefully, then why are you not calling
 ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
 matter whether the shutdown hook was called or not.

 TD

 On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Just figured out that if I want to perform graceful shutdown of Spark
 Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
 longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
 Spark Core, that gets anyway called , which leads to graceful shutdown from
 Spark streaming failed with error like Sparkcontext already closed issue.

 To solve this , I need to explicitly add Utils.addShutdownHook in my
 driver with higher priority ( say 150 ) than Spark's shutdown priority of
 50 , and there I specified streamingcontext stop method with (false , true)
 parameter.

 Just curious to know , if this is how we need to handle shutdown hook
 going forward ?

 Can't we make the streaming shutdown default to gracefully  shutdown ?

 Also the Java Api for adding shutdownhook in Utils looks very dirty with
 methods like this ..



 Utils.addShutdownHook(150, new Function0BoxedUnit() {
  @Override
 public BoxedUnit apply() {
 return null;
 }

 @Override
 public byte apply$mcB$sp() {
 return 0;
 }

 @Override
 public char apply$mcC$sp() {
 return 0;
 }

 @Override
 public double apply$mcD$sp() {
 return 0;
 }

 @Override
 public float apply$mcF$sp() {
 return 0;
 }

 @Override
 public int apply$mcI$sp() {
 // TODO Auto-generated method stub
 return 0;
 }

 @Override
 public long apply$mcJ$sp() {
 return 0;
 }

 @Override
 public short apply$mcS$sp() {
 return 0;
 }

 @Override
 public void apply$mcV$sp() {
  *jsc.stop(false, true);*
  }

 @Override
 public boolean apply$mcZ$sp() {
 // TODO Auto-generated method stub
 return false;
 }
 });





Re: spark streaming doubt

2015-05-19 Thread Dibyendu Bhattacharya
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
Level Consumer API.

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


Regards,
Dibyendu

On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel api
 and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet
 finished and stream interval reaches. Does it starts next job or wait for
 first to finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or 

Spark Streaming graceful shutdown in Spark 1.4

2015-05-18 Thread Dibyendu Bhattacharya
Hi,

Just figured out that if I want to perform graceful shutdown of Spark
Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
Spark Core, that gets anyway called , which leads to graceful shutdown from
Spark streaming failed with error like Sparkcontext already closed issue.

To solve this , I need to explicitly add Utils.addShutdownHook in my driver
with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and
there I specified streamingcontext stop method with (false , true)
parameter.

Just curious to know , if this is how we need to handle shutdown hook going
forward ?

Can't we make the streaming shutdown default to gracefully  shutdown ?

Also the Java Api for adding shutdownhook in Utils looks very dirty with
methods like this ..



Utils.addShutdownHook(150, new Function0BoxedUnit() {
 @Override
public BoxedUnit apply() {
return null;
}

@Override
public byte apply$mcB$sp() {
return 0;
}

@Override
public char apply$mcC$sp() {
return 0;
}

@Override
public double apply$mcD$sp() {
return 0;
}

@Override
public float apply$mcF$sp() {
return 0;
}

@Override
public int apply$mcI$sp() {
// TODO Auto-generated method stub
return 0;
}

@Override
public long apply$mcJ$sp() {
return 0;
}

@Override
public short apply$mcS$sp() {
return 0;
}

@Override
public void apply$mcV$sp() {
 *jsc.stop(false, true);*
 }

@Override
public boolean apply$mcZ$sp() {
// TODO Auto-generated method stub
return false;
}
});


Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Dibyendu Bhattacharya
Thanks Cody for your email. I think my concern was not to get the ordering
of message within a partition , which as you said is possible if one knows
how Spark works. The issue is how Spark schedule jobs on every batch  which
is not on the same order they generated. So if that is not guaranteed it
does not matter if you manege order within your partition. So depends on
par-partition ordering to commit offset may leads to offsets commit in
wrong order.

In this thread you have discussed this as well and some workaround  :

https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

So again , one need to understand every details of a Consumer to take a
decision if that solves their use case.

Regards,
Dibyendu

On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition data

 It's not accurate to imply that either one of those things are inherently
 cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to use
 checkpoints.  You can save offsets in a checkpoint, your own database, or
 not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets / results
 per partition on the executors, after a shuffle of some kind. You can avoid
 this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream ,
 but both direct stream based approach and receiver based low level consumer
 approach has pros and cons. Like Receiver based approach need to use WAL
 for recovery from Driver failure which is a overhead for Kafka like system
 . For direct stream the offsets stored as check-pointed directory got lost
 if driver code is modified ..you can manage offset from your driver but for
 derived stream generated from this direct stream , there is no guarantee
 that batches are processed is order ( and offsets commits in order ) .. etc
 ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked
 library is much more flexible/reliable than what's available in Spark 
 at
 this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark

Re: force the kafka consumer process to different machines

2015-05-13 Thread Dibyendu Bhattacharya
or you can use this Receiver as well :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Where you can specify how many Receivers you need for your topic and it
will divides the partitions among the Receiver and return the joined stream
for you .

Say you specified 20 receivers , in that case each Receiver can handle 4
partitions and you get consumer parallelism of 20 receivers .

Dibyendu

On Wed, May 13, 2015 at 9:28 PM, 李森栋 lisend...@163.com wrote:

 thank you very much


 来自 魅族 MX4 Pro

  原始邮件 
 发件人:Cody Koeninger c...@koeninger.org
 时间:周三 5月13日 23:52
 收件人:hotdog lisend...@163.com
 抄送:user@spark.apache.org
 主题:Re: force the kafka consumer process to different machines

 I assume you're using the receiver based approach?  Have you tried the
 createDirectStream api?
 
 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
 
 If you're sticking with the receiver based approach I think your only
 option would be to create more consumer streams and union them.  That
 doesn't give you control over where they're run, but should increase the
 consumer parallelism.
 
 On Wed, May 13, 2015 at 10:33 AM, hotdog lisend...@163.com wrote:
 
  I 'm using streaming integrated with streaming-kafka.
 
  My kafka topic has 80 partitions, while my machines have 40 cores. I
 found
  that when the job is running, the kafka consumer processes are only
 deploy
  to 2 machines, the bandwidth of the 2 machines will be very very high.
 
  I wonder is there any way to control the kafka consumer's dispatch?
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Dibyendu Bhattacharya
The low level consumer which Akhil mentioned , has been running in Pearson
for last 4-5 months without any downtime. I think this one is the reliable
Receiver Based Kafka consumer as of today for Spark .. if you say it that
way ..

Prior to Spark 1.3 other Receiver based consumers have used Kafka High
level APIs which has serious issue with re-balancing and lesser fault
tolerant aspect and data loss .

Cody's implementation is definitely a good approach using direct stream ,
but both direct stream based approach and receiver based low level consumer
approach has pros and cons. Like Receiver based approach need to use WAL
for recovery from Driver failure which is a overhead for Kafka like system
. For direct stream the offsets stored as check-pointed directory got lost
if driver code is modified ..you can manage offset from your driver but for
derived stream generated from this direct stream , there is no guarantee
that batches are processed is order ( and offsets commits in order ) .. etc
..

So whoever use whichever consumer need to study pros and cons of both
approach before taking a call ..

Regards,
Dibyendu







On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is
 restarted I want to read only messages that arrived into Kafka following
 the restart of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk










Re: Some questions on Multiple Streams

2015-04-24 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer from spark-packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) .

How many partitions are there for your topics ? Let say you have 10 topics
, and each having 3 partition , ideally you can create max 30 parallel
Receiver and 30 streams. What I understand from your requirement is , for
any given topic you want to choose the number of Receivers . e.g. for Topic
A , you may choose 1 Receiver , for Topic B you choose 2 , for Topic C you
choose 3 etc ..

Now if you can distribute the topics to Receiver like this , you can very
well use the above consumer which has this facility . Each Receiver task
takes one executor core , so you can calculate accordingly.

The implementation has a code example and read-me file , if you wish to try
this , you can always email me .

Regards,
Dibyendu






On Fri, Apr 17, 2015 at 3:06 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 I am working with multiple Kafka streams (23 streams) and currently I am
 processing them separately. I receive one stream from each topic. I have
 the following questions.

 1.Spark streaming guide suggests to union these streams. *Is it
 possible to get statistics of each stream even after they are unioned?*

 2.My calculations are not complex. I use 2 second batch interval and
 if I use 2 streams they get easily processed under 2 seconds by a single
 core. There is some shuffling involved in my application. As I increase the
 number of streams and the number of executors accordingly, the applications
 scheduling delay increases and become unmanageable in 2 seconds. As I
 believe this happens because with that many streams, the number of tasks
 increases thus the shuffling magnifies and also that all streams using the
 same executors. *Is it possible to provide part of executors to
 particular stream while processing streams simultaneously?* E.g. if I
 have 15 cores on cluster and 5 streams, 5  cores will be taken by 5
 receivers and of the rest 10, can I provide 2 cores each to one of the 5
 streams. Just to add, increasing the batch interval does help but I don't
 want to increase the batch size due to application restrictions and delayed
 results (The blockInterval and defaultParallelism does help to a limited
 extent).

 *Please see attach file for CODE SNIPPET*

 Regards,
 Laeeq


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Latest enhancement in Low Level Receiver based Kafka Consumer

2015-04-01 Thread Dibyendu Bhattacharya
Hi,

Just to let you know, I have made some enhancement in Low Level Reliable
Receiver based Kafka Consumer (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)  .

Earlier version uses as many Receiver task for number of partitions of your
kafka topic . Now you can configure desired number of Receivers task and
every Receiver can handle subset of topic partitions.

There was some use cases where consumer need to handle gigantic topics (
having 100+ partitions ) and using my receiver creates that many Receiver
task and hence that many CPU cores is needed just for Receiver. It was a
issue .


In latest code, I have changed that behavior. The max limit for number of
Receiver is still your number of partition, but if you specify less number
of Receiver task, every receiver will handle a subset of partitions and
consume using Kafka Low Level consumer API.

Every receiver will manages partition(s) offset in ZK as usual way..


You can see the latest consumer here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer



Regards,
Dibyendu


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Dibyendu Bhattacharya
Which version of Spark you are running ?

You can try this Low Level Consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

This is designed to recover from various failures and have very good fault
recovery mechanism built in. This is being used by many users and at
present we at Pearson running this Receiver in Production for almost 3
months without any issue.

You can give this a try.

Regards,
Dibyendu

On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You need to figure out why the receivers failed in the first place. Look
 in your worker logs and see what really happened. When you run a streaming
 job continuously for longer period mostly there'll be a lot of logs (you
 can enable log rotation etc.) and if you are doing a groupBy, join, etc
 type of operations, then there will be a lot of shuffle data. So You need
 to check in the worker logs and see what happened (whether DISK full etc.),
 We have streaming pipelines running for weeks without having any issues.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote:

 Guys,

 We have a project which builds upon Spark streaming.

 We use Kafka as the input stream, and create 5 receivers.

 When this application runs for around 90 hour, all the 5 receivers failed
 for some unknown reasons.

 In my understanding, it is not guaranteed that Spark streaming receiver
 will do fault recovery automatically.

 So I just want to figure out a way for doing fault-recovery to deal with
 receiver failure.

 There is a JIRA post mentioned using StreamingLister for monitoring the
 status of receiver:


 https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

 However I haven't found any open doc about how to do this stuff.

 Any guys have met the same issue and deal with it?

 Our environment:
Spark 1.3.0
Dual Master Configuration
Kafka 0.8.2

 Thanks

 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro





Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Dibyendu Bhattacharya
Yes.. Auto restart is enabled in my low level consumer ..when there is some
unhandled exception comes...

Even if you see KafkaConsumer.java, for some cases ( like broker failure,
kafka leader changes etc ) it can even refresh the Consumer (The
Coordinator which talks to a Leader) which will recover from those
failures..

Dib

On Mon, Mar 16, 2015 at 1:40 PM, Jun Yang yangjun...@gmail.com wrote:

 I have checked Dibyendu's code, it looks that his implementation has
 auto-restart mechanism:


 
 src/main/java/consumer/kafka/client/KafkaReceiver.java:

 private void start() {

 // Start the thread that receives data over a connection
 KafkaConfig kafkaConfig = new KafkaConfig(_props);
 ZkState zkState = new ZkState(kafkaConfig);
 _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this);
 _kConsumer.open(_partitionId);

 Thread.UncaughtExceptionHandler eh = new
 Thread.UncaughtExceptionHandler() {
 public void uncaughtException(Thread th, Throwable ex) {
   restart(Restarting Receiver for Partition  + _partitionId ,
 ex, 5000);
 }
 };

 _consumerThread = new Thread(_kConsumer);
 _consumerThread.setDaemon(true);
 _consumerThread.setUncaughtExceptionHandler(eh);
 _consumerThread.start();
   }

 
 I also checked Spark's native Kafka Receiver implementation, and it looks
 not have any auto-restart support.

 Any comments from Dibyendu?

 On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 As i seen, once i kill my receiver on one machine, it will automatically
 spawn another receiver on another machine or on the same machine.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang yangjun...@gmail.com wrote:

 Dibyendu,

 Thanks for the reply.

 I am reading your project homepage now.

 One quick question I care about is:

 If the receivers failed for some reasons(for example, killed brutally by
 someone else), is there any mechanism for the receiver to fail over
 automatically?

 On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Which version of Spark you are running ?

 You can try this Low Level Consumer :
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

 This is designed to recover from various failures and have very good
 fault recovery mechanism built in. This is being used by many users and at
 present we at Pearson running this Receiver in Production for almost 3
 months without any issue.

 You can give this a try.

 Regards,
 Dibyendu

 On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 You need to figure out why the receivers failed in the first place.
 Look in your worker logs and see what really happened. When you run a
 streaming job continuously for longer period mostly there'll be a lot of
 logs (you can enable log rotation etc.) and if you are doing a groupBy,
 join, etc type of operations, then there will be a lot of shuffle data. So
 You need to check in the worker logs and see what happened (whether DISK
 full etc.), We have streaming pipelines running for weeks without having
 any issues.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com
 wrote:

 Guys,

 We have a project which builds upon Spark streaming.

 We use Kafka as the input stream, and create 5 receivers.

 When this application runs for around 90 hour, all the 5 receivers
 failed for some unknown reasons.

 In my understanding, it is not guaranteed that Spark streaming
 receiver will do fault recovery automatically.

 So I just want to figure out a way for doing fault-recovery to deal
 with receiver failure.

 There is a JIRA post mentioned using StreamingLister for monitoring
 the status of receiver:


 https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

 However I haven't found any open doc about how to do this stuff.

 Any guys have met the same issue and deal with it?

 Our environment:
Spark 1.3.0
Dual Master Configuration
Kafka 0.8.2

 Thanks

 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro






 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro





 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro



Re: Spark streaming app shutting down

2015-02-04 Thread Dibyendu Bhattacharya
Thanks Akhil for mentioning this Low Level Consumer (
https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better
fault tolerant mechanism than any existing Kafka consumer available . This
has no data loss on receiver failure and have ability to reply or restart
itself in-case of failure. You can definitely give it a try .

Dibyendu

On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault
 tolerance, which means it can handle the receiver/driver failures. You can
 also look at the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer which has a better
 fault tolerance mechanism for receiver failures. This low level consumer
 will push the offset of the message being read into zookeeper for fault
 tolerance. In your case i think mostly the inflight data would be lost if
 you arent using any of the fault tolerance mechanism.

 Thanks
 Best Regards

 On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Sprakans,

 I'm running a spark streaming app which reads data from kafka topic does
 some processing and then persists the results in HBase.

 I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8
 cores each). I've enable checkpointing  I am also  rate limiting my
 kafkaReceivers so that the number of items read is not more than 10 records
 per sec.
 The kafkaReceiver I'm using is *not* ReliableKafkaReceiver.

 This app was running fine for ~3 days then there was an increased load on
 the HBase server because of some other process querying HBase tables.
 This led to increase in the batch processing time of the spark batches
 (processed 1 min batch in 10 min) which previously was finishing in 20 sec
 which in turn led to the shutdown of the spark application, PFA the
 executor logs.

 From the logs I'm getting below exceptions *[1]*  *[2]* looks like
 there was some outstanding Jobs that didn't get processed or the Job
 couldn't find the input data. From the logs it looks seems that the
 shutdown hook gets invoked but it cannot process the in-flight block.

 I have a couple of queries on this
   1) Does this mean that these jobs failed and the *in-flight data *is
 lost?
   2) Does the Spark job *buffers kafka* input data while the Job is
 under processing state for 10 mins and on shutdown is that too lost? (I do
 not see any OOM error in the logs).
   3) Can we have *explicit commits* enabled in the kafkaReceiver so that
 the offsets gets committed only when the RDD(s) get successfully processed?

 Also I'd like to know if there is a *graceful way to shutdown a spark
 app running on yarn*. Currently I'm killing the yarn app to stop it
 which leads to loss of that job's history wheras in this case the
 application stops and succeeds and thus preserves the logs  history.

 *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still
 have 1 requests outstanding when connection from
 hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed
 *[2]* java.lang.Exception: Could not compute split, block
 input-2-1422901498800 not found
 *[3]* 
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode
 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63
 does not have any open files.

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Dibyendu Bhattacharya
Or you can use this Low Level Kafka Consumer for Spark :
https://github.com/dibbhatt/kafka-spark-consumer

This is now part of http://spark-packages.org/ and is running successfully
for past few months in Pearson production environment . Being Low Level
consumer, it does not have this re-balancing issue which High Level
consumer have.

Also I know there are few who has shifted to this Low Level Consumer which
started giving them a better robust fault tolerant Kafka Receiver for Spark.

Regards,
Dibyendu

On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 This is an issue that is hard to resolve without rearchitecting the whole
 Kafka Receiver. There are some workarounds worth looking into.


 http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E

 On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote:

 Hi,

 This seems not fixed yet.
 I filed an issue in jira:
 https://issues.apache.org/jira/browse/SPARK-5505

 Greg



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Dibyendu Bhattacharya
Thanks Neelesh . Glad to know this Low Level Consumer is working for you.

Dibyendu

On Tue, Feb 3, 2015 at 8:06 AM, Neelesh neele...@gmail.com wrote:

 We're planning to use this as well (Dibyendu's
 https://github.com/dibbhatt/kafka-spark-consumer ). Dibyendu, thanks for
 the efforts. So far its working nicely. I think there is merit in make it
 the default Kafka Receiver for spark streaming.

 -neelesh

 On Mon, Feb 2, 2015 at 5:25 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Or you can use this Low Level Kafka Consumer for Spark :
 https://github.com/dibbhatt/kafka-spark-consumer

 This is now part of http://spark-packages.org/ and is running
 successfully for past few months in Pearson production environment . Being
 Low Level consumer, it does not have this re-balancing issue which High
 Level consumer have.

 Also I know there are few who has shifted to this Low Level Consumer
 which started giving them a better robust fault tolerant Kafka Receiver for
 Spark.

 Regards,
 Dibyendu

 On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 This is an issue that is hard to resolve without rearchitecting the
 whole Kafka Receiver. There are some workarounds worth looking into.


 http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E

 On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com
 wrote:

 Hi,

 This seems not fixed yet.
 I filed an issue in jira:
 https://issues.apache.org/jira/browse/SPARK-5505

 Greg



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark Streaming with Kafka

2015-01-21 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer option with Spark 1.2

https://github.com/dibbhatt/kafka-spark-consumer

This Consumer can recover from any underlying failure of Spark Platform or
Kafka and either retry or restart the receiver. This is being working
nicely for us.

Regards,
Dibyendu


On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 dhiraj.peech...@gmail.com
wrote:

 Hi,

I am having similar issues. Have you found any resolution ?

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Dibyendu Bhattacharya
My code handles the Kafka Consumer part. But writing to Kafka may not be a
big challenge which you can easily do in your driver code.

dibyendu

On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi Dib,

 For our usecase I want my spark job1 to read from hdfs/cache and write to
 kafka queues. Similarly spark job2 should read from kafka queues and write
 to kafka queues.

 Is writing to kafka queues from spark job supported in your code ?

 Thanks
 Deb
  On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 There was a simple example
 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
 which you can run after changing few lines of configurations.

 Thanks
 Best Regards

 On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not
 see any issue with Receiver.Store method . It is able to fetch messages
 form Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and
 let you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

No , I have not tried yet with Spark 1.2 yet. I will try this out and let
you know how this goes.

By the way, is there any change in Receiver Store method happened in Spark
1.2 ?



Regards,
Dibyendu



On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
any issue with Receiver.Store method . It is able to fetch messages form
Kafka.

Can you cross check other configurations in your setup like Kafka broker IP
, topic name, zk host details, consumer id etc.

Dib

On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in Spark
 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Dibyendu Bhattacharya
Hi,

Yes, as Jerry mentioned, the Spark -3129 (
https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
which solves the Driver failure problem. The way 3129 is designed , it
solved the driver failure problem agnostic of the source of the stream (
like Kafka or Flume etc) But with just 3129 you can not achieve complete
solution for data loss. You need a reliable receiver which should also
solves the data loss issue on receiver failure.

The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
for which this email thread was started has solved that problem with Kafka
Low Level API.

And Spark-4062 as Jerry mentioned also recently solved the same problem
using Kafka High Level API.

On the Kafka High Level Consumer API approach , I would like to mention
that Kafka 0.8 has some issue as mentioned in this wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
where consumer re-balance sometime fails and that is one of the key reason
Kafka is re-writing consumer API in Kafka 0.9.

I know there are few folks already have faced this re-balancing issues
while using Kafka High Level API , and If you ask my opinion, we at Pearson
are still using the Low Level Consumer as this seems to be more robust and
performant and we have been using this for few months without any issue
..and also I may be little biased :)

Regards,
Dibyendu



On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote:

 Hi Rod,

 The purpose of introducing  WAL mechanism in Spark Streaming as a general
 solution is to make all the receivers be benefit from this mechanism.

 Though as you said, external sources like Kafka have their own checkpoint
 mechanism, instead of storing data in WAL, we can only store metadata to
 WAL, and recover from the last committed offsets. But this requires
 sophisticated design of Kafka receiver with low-level API involved, also we
 need to take care of rebalance and fault tolerance things by ourselves. So
 right now instead of implementing a whole new receiver, we choose to
 implement a simple one, though the performance is not so good, it's much
 easier to understand and maintain.

 The design purpose and implementation of reliable Kafka receiver can be
 found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
 future, to improve the reliable Kafka receiver like what you mentioned is
 on our scheduler.

 Thanks
 Jerry


 -Original Message-
 From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
 Sent: Wednesday, December 3, 2014 5:44 AM
 To: u...@spark.incubator.apache.org
 Subject: Re: Low Level Kafka Consumer for Spark

 Dibyendu,

 Just to make sure I will not be misunderstood - My concerns are referring
 to the Spark upcoming solution and not yours. I would to gather the
 perspective of someone which implemented recovery with Kafka a different
 way.

 Tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Error when Spark streaming consumes from Kafka

2014-11-22 Thread Dibyendu Bhattacharya
I believe this is something to do with how Kafka High Level API manages
consumers within a Consumer group and how it re-balance during failure. You
can find some mention in this Kafka wiki.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

Due to various issues in Kafka High Level APIs, Kafka is moving the High
Level Consumer API to a complete new set of API in Kafka 0.9.

Other than this co-ordination issue, High Level consumer also has data loss
issues.

You can probably try this Spark-Kafka consumer which uses Low Level Simple
consumer API which is more performant and have no data loss scenarios.

https://github.com/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I am using Spark to consume from Kafka. However, after the job has run for
 several hours, I saw the following failure of an executor:

 kafka.common.ConsumerRebalanceFailedException: 
 group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 can't 
 rebalance after 4 retries
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
 
 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
 
 kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
 
 kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
 
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)


 Does anyone know the reason for this exception? Thanks!

 Bill




Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote:

 Hi Dibyendu,

 I am a little confused about the need for rate limiting input from
 kafka. If the stream coming in from kafka has higher message/second
 rate than what a Spark job can process then it should simply build a
 backlog in Spark if the RDDs are cached on disk using persist().
 Right?

 Thanks,

 Tim


 On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  Hi Alon,
 
  No this will not be guarantee that same set of messages will come in same
  RDD. This fix just re-play the messages from last processed offset in
 same
  order. Again this is just a interim fix we needed to solve our use case
 . If
  you do not need this message re-play feature, just do not perform the
 ack (
  Acknowledgement) call in the Driver code. Then the processed messages
 will
  not be written to ZK and hence replay will not happen.
 
  Regards,
  Dibyendu
 
  On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
  wrote:
 
  Hi Dibyendu,
 
  Thanks for your great work!
 
  I'm new to Spark Streaming, so I just want to make sure I understand
  Driver
  failure issue correctly.
 
  In my use case, I want to make sure that messages coming in from Kafka
 are
  always broken into the same set of RDDs, meaning that if a set of
 messages
  are assigned to one RDD, and the Driver dies before this RDD is
 processed,
  then once the Driver recovers, the same set of messages are assigned to
 a
  single RDD, instead of arbitrarily repartitioning the messages across
  different RDDs.
 
  Does your Receiver guarantee this behavior, until the problem is fixed
 in
  Spark 1.2?
 
  Regards,
  Alon
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Dibyendu Bhattacharya
)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
   at 
 org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
   at 
 org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
   at 
 java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:744)




 --
 Nan Zhu

 On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote:

  Hi,

 Can you attach more logs to see if there is some entry from ContextCleaner?

 I met very similar issue before…but haven’t get resolved

 Best,

 --
 Nan Zhu

 On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote:

 Dear All,

 Not sure if this is a false alarm. But wanted to raise to this to
 understand what is happening.

 I am testing the Kafka Receiver which I have written (
 https://github.com/dibbhatt/kafka-spark-consumer) which basically a low
 level Kafka Consumer implemented custom Receivers for every Kafka topic
 partitions and pulling data in parallel. Individual streams from all topic
 partitions are then merged to create Union stream which used for further
 processing.

 The custom Receiver working fine in normal load

Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Dibyendu Bhattacharya
I agree,

Even the Low Level Kafka Consumer which I have written has tunable IO
throttling which help me solve this issue ... But question remains , even
if there are large backlog, why Spark drop the unprocessed memory blocks ?

Dib

On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim jeoffr...@gmail.com wrote:

 Our issue could be related to this problem as described in:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
  which
 the DStream is processed for every 1 hour batch duration.

 I have implemented IO throttling in the Receiver as well in our Kafka
 consumer, and our backlog is not that large.

 NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping
 INFO : org.apache.spark.storage.BlockManager - Dropping block
 *input-0-1410443074600* from memory
 INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of
 size 12651900 dropped from memory (free 21220667)
 INFO : org.apache.spark.storage.BlockManagerInfo - Removed
 input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
 (size: 12.1 MB, free: 100.6 MB)

 The question that I have now is: how to prevent the
 MemoryStore/BlockManager of dropping the block inputs? And should they be
 logged in the level WARN/ERROR?


 Thanks.


 On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
 User List] [hidden email]
 http://user/SendEmail.jtp?type=nodenode=14081i=0 wrote:

 Dear all,

 I am sorry. This was a false alarm

 There was some issue in the RDD processing logic which leads to large
 backlog. Once I fixed the issues in my processing logic, I can see all
 messages being pulled nicely without any Block Removed error. I need to
 tune certain configurations in my Kafka Consumer to modify the data rate
 and also the batch size.

 Sorry again.


 Regards,
 Dibyendu

 On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=14075i=0 wrote:

  This is my case about broadcast variable:

 14/07/21 19:49:13 INFO Executor: Running task ID 4
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
 (progress: 3/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO Executor: Finished task ID 3
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 5
 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
 (progress: 4/106)
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
 memory (free 886623436)*
 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 
 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 
 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
 14/07/21 19:49:13 INFO Executor: Finished task ID 4
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 6
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
 (progress: 5/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
 14/07/21 19:49:13 INFO Executor: Finished task ID 5
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7

Re: How to scale more consumer to Kafka stream

2014-09-11 Thread Dibyendu Bhattacharya
I agree Gerard. Thanks for pointing this..

Dib

On Thu, Sep 11, 2014 at 5:28 PM, Gerard Maas gerard.m...@gmail.com wrote:

 This pattern works.

 One note, thought: Use 'union' only if you need to group the data from all
 RDDs into one RDD for processing (like count distinct or need a groupby).
 If your process can be parallelized over every stream of incoming data, I
 suggest you just apply the required transformations on every dstream and
 avoid 'union' altogether.

 -kr, Gerard.



 On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith secs...@gmail.com wrote:

 How are you creating your kafka streams in Spark?

 If you have 10 partitions for a topic, you can call createStream ten
 times to create 10 parallel receivers/executors and then use union to
 combine all the dStreams.



 On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote:

 Hi (my previous post as been used by someone else)

 I'm building a application the read from kafka stream event. In
 production
 we've 5 consumers that share 10 partitions.
 But on spark streaming kafka only 1 worker act as a consumer then
 distribute
 the tasks to workers so I can have only 1 machine acting as consumer but
 I
 need more because only 1 consumer means Lags.

 Do you've any idea what I can do ? Another point is interresting the
 master
 is not loaded at all I can get up more than 10 % CPU

 I've tried to increase the queued.max.message.chunks on the kafka client
 to
 read more records thinking it'll speed up the read but I only get

 ERROR consumer.ConsumerFetcherThread:

 [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
 ClientId:

 SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
 ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7]
 -
 PartitionFetchInfo(929838589,1048576),[IA2,6] -
 PartitionFetchInfo(929515796,1048576),[IA2,9] -
 PartitionFetchInfo(929577946,1048576),[IA2,8] -
 PartitionFetchInfo(930751599,1048576),[IA2,2] -
 PartitionFetchInfo(926457704,1048576),[IA2,5] -
 PartitionFetchInfo(930774385,1048576),[IA2,0] -
 PartitionFetchInfo(929913213,1048576),[IA2,3] -
 PartitionFetchInfo(929268891,1048576),[IA2,4] -
 PartitionFetchInfo(929949877,1048576),[IA2,1] -
 PartitionFetchInfo(930063114,1048576)
 java.lang.OutOfMemoryError: Java heap space

 Is someone have ideas ?
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-09-10 Thread Dibyendu Bhattacharya
Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to Store ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only storage.BlockManagerInfo: Added input... type
messages, but hardly I see scheduler.TaskSetManager: Starting task...
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Tathagata,

 I have managed to implement the logic into the Kafka-Spark consumer to
 recover from Driver failure. This is just a interim fix till actual fix is
 done from Spark side.

 The logic is something like this.

 1. When the Individual Receivers starts for every Topic partition, it
 writes the Kafka messages along with certain meta data in Block Store. This
 meta data contains the details of message offset, partition id, topic name
 and consumer id. You can see this logic in PartitionManager.java  next()
 method.

 2.  In the Driver code ( Consumer.java) , I am creating the union of all
 there individual D-Streams, and processing the data using forEachRDD call.
 In the driver code, I am receiving the RDD which contains the Kafka
 messages along with meta data details. In the driver code, periodically I
 am committing the processed offset of the Kafka message into ZK.

 3. When driver stops, and restart again, the Receiver starts again, and
 this time in PartitionManager.java, I am checking what is the actual
 committed offset for the partition, and what is the actual processed
 offset of the same partition. This logic is in the PartitionManager
 constructor.

 If this is a Receiver restart, and processed offset of less than
 Committed offset, I am started fetching again from Processed offset.
 This may lead to duplicate records, but our system can handle duplicates.

 I have tested with multiple driver kill/stops and I found no data loss in
 Kafka consumer.

 In the Driver code, I have not done any checkpointing yet, will test
 that tomorrow.


 One interesting thing I found, if I do repartition of original stream ,
 I can still see the issue of data loss in this logic. What I believe,
 during re- partitioning Spark might be changing the order of RDDs the way
 it generated from Kafka stream. So during re-partition case, even when I am
 committing processed offset, but as this is not in order I still see issue.
 Not sure if this understanding is correct, but not able to find any other
 explanation.

 But if I do not use repartition this solution works fine.

 I can make this as configurable, so that when actual fix is available ,
 this feature in consumer can be turned off as this is an overhead for the
 consumer . Let me know what you think..

 Regards,
 Dibyendu




 On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3

Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Dibyendu Bhattacharya
Hi,

You can use this Kafka Spark Consumer.
https://github.com/dibbhatt/kafka-spark-consumer

This is exactly does that . It creates parallel Receivers for every Kafka
topic partitions. You can see the Consumer.java under consumer.kafka.client
package to see an example how to use it.

There is some discussion on this Consumer you can find it here :
https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689

Regards,
Dib


On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith secs...@gmail.com wrote:

 How are you creating your kafka streams in Spark?

 If you have 10 partitions for a topic, you can call createStream ten
 times to create 10 parallel receivers/executors and then use union to
 combine all the dStreams.



 On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote:

 Hi (my previous post as been used by someone else)

 I'm building a application the read from kafka stream event. In production
 we've 5 consumers that share 10 partitions.
 But on spark streaming kafka only 1 worker act as a consumer then
 distribute
 the tasks to workers so I can have only 1 machine acting as consumer but I
 need more because only 1 consumer means Lags.

 Do you've any idea what I can do ? Another point is interresting the
 master
 is not loaded at all I can get up more than 10 % CPU

 I've tried to increase the queued.max.message.chunks on the kafka client
 to
 read more records thinking it'll speed up the read but I only get

 ERROR consumer.ConsumerFetcherThread:

 [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
 ClientId:

 SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
 ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] -
 PartitionFetchInfo(929838589,1048576),[IA2,6] -
 PartitionFetchInfo(929515796,1048576),[IA2,9] -
 PartitionFetchInfo(929577946,1048576),[IA2,8] -
 PartitionFetchInfo(930751599,1048576),[IA2,2] -
 PartitionFetchInfo(926457704,1048576),[IA2,5] -
 PartitionFetchInfo(930774385,1048576),[IA2,0] -
 PartitionFetchInfo(929913213,1048576),[IA2,3] -
 PartitionFetchInfo(929268891,1048576),[IA2,4] -
 PartitionFetchInfo(929949877,1048576),[IA2,1] -
 PartitionFetchInfo(930063114,1048576)
 java.lang.OutOfMemoryError: Java heap space

 Is someone have ideas ?
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-09-07 Thread Dibyendu Bhattacharya
Hi Tathagata,

I have managed to implement the logic into the Kafka-Spark consumer to
recover from Driver failure. This is just a interim fix till actual fix is
done from Spark side.

The logic is something like this.

1. When the Individual Receivers starts for every Topic partition, it
writes the Kafka messages along with certain meta data in Block Store. This
meta data contains the details of message offset, partition id, topic name
and consumer id. You can see this logic in PartitionManager.java  next()
method.

2.  In the Driver code ( Consumer.java) , I am creating the union of all
there individual D-Streams, and processing the data using forEachRDD call.
In the driver code, I am receiving the RDD which contains the Kafka
messages along with meta data details. In the driver code, periodically I
am committing the processed offset of the Kafka message into ZK.

3. When driver stops, and restart again, the Receiver starts again, and
this time in PartitionManager.java, I am checking what is the actual
committed offset for the partition, and what is the actual processed
offset of the same partition. This logic is in the PartitionManager
constructor.

If this is a Receiver restart, and processed offset of less than
Committed offset, I am started fetching again from Processed offset.
This may lead to duplicate records, but our system can handle duplicates.

I have tested with multiple driver kill/stops and I found no data loss in
Kafka consumer.

In the Driver code, I have not done any checkpointing yet, will test that
tomorrow.


One interesting thing I found, if I do repartition of original stream , I
can still see the issue of data loss in this logic. What I believe, during
re- partitioning Spark might be changing the order of RDDs the way it
generated from Kafka stream. So during re-partition case, even when I am
committing processed offset, but as this is not in order I still see issue.
Not sure if this understanding is correct, but not able to find any other
explanation.

But if I do not use repartition this solution works fine.

I can make this as configurable, so that when actual fix is available ,
this feature in consumer can be turned off as this is an overhead for the
consumer . Let me know what you think..

Regards,
Dibyendu




On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. Repartitioning: I am trying to understand the repartition issue. One
 common mistake I have seen is that developers repartition a stream but not
 use the repartitioned stream.

 WRONG:
 inputDstream.repartition(100)
 inputDstream.map(...).count().print()

 RIGHT:
 val repartitionedDStream = inputDStream.repartitoin(100)
 repartitionedDStream.map(...).count().print()

 Not sure if this helps solve the problem that you all the facing. I am
 going to add this to the stremaing programming guide to make sure this
 common mistake is avoided.

 TD




 On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Sorry for little delay . As discussed in this thread, I have modified the
 Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
 code to have dedicated Receiver for every Topic Partition. You can see the
 example howto create Union of these receivers
 in consumer.kafka.client.Consumer.java .

 Thanks to Chris for suggesting this change.

 Regards,
 Dibyendu


 On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com
 wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK

Re: Low Level Kafka Consumer for Spark

2014-09-03 Thread Dibyendu Bhattacharya
Hi,

Sorry for little delay . As discussed in this thread, I have modified the
Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
code to have dedicated Receiver for every Topic Partition. You can see the
example howto create Union of these receivers
in consumer.kafka.client.Consumer.java .

Thanks to Chris for suggesting this change.

Regards,
Dibyendu


On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Dibyendu Bhattacharya
I agree. This issue should be fixed in Spark rather rely on replay of Kafka
messages.

Dib
On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi,

As I understand, your problem is similar to this JIRA.

https://issues.apache.org/jira/browse/SPARK-1647

The issue in this case, Kafka can not replay the message as offsets are
already committed. Also I think existing KafkaUtils ( The Default High
Level Kafka Consumer) also have this issue.

Similar discussion is there in this thread also...

http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html

As I am thinking, it is possible to tackle this in the consumer code I have
written. If we can store the topic partition_id and consumed offset in ZK
after every checkpoint , then after Spark recover from the fail over, the
present PartitionManager code can start reading from last checkpointed
offset ( instead last committed offset as it is doing now) ..In that case
it can replay the data since last checkpoint.

I will think over it ..

Regards,
Dibyendu



On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Dibyendu,

 My colleague has taken a look at the spark kafka consumer github you have
 provided and started experimenting.

 We found that somehow when Spark has a failure after a data checkpoint, the
 expected re-computations correspondent to the metadata checkpoints are not
 recovered so we loose Kafka messages and RDD's computations in Spark.
 The impression is that this code is replacing quite a bit of Spark Kafka
 Streaming code where maybe (not sure) metadata checkpoints are done every
 batch interval.

 Was it on purpose to solely depend on the Kafka commit to recover data and
 recomputations between data checkpoints? If so, how to make this work?

 tnks
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi Bharat,

Thanks for your email. If the Kafka Reader worker process dies, it will
be replaced by different machine, and it will start consuming from the
offset where it left over ( for each partition). Same case can happen even
if I tried to have individual Receiver for every partition.

Regards,
Dibyendu


On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Dibyendu Bhattacharya
Dear All,

Recently I have written a Spark Kafka Consumer to solve this problem. Even
we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
and consumer code has no handle to offset management.

The below code solves this problem, and this has is being tested in our
Spark Cluster and this working fine as of now.

https://github.com/dibbhatt/kafka-spark-consumer

This is Low Level Kafka Consumer using Kafka Simple Consumer API.

Please have a look at it and let me know your opinion. This has been
written to eliminate the Data loss by committing the offset after it is
written to BM. Also existing HighLevel KafkaUtils does not have any feature
to control Data Flow, and is gives Out Of Memory error is there is too much
backlogs in Kafka. This consumer solves this problem as well.  And this
code has been modified from earlier Storm Kafka consumer code and it has
lot of other features like recovery from Kafka node failures, ZK failures,
recover from Offset errors etc.

Regards,
Dibyendu


On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com wrote:

  I think Currently Spark Streaming lack a data acknowledging mechanism
 when data is stored and replicated in BlockManager, so potentially data
 will be lost even pulled into Kafka, say if data is stored just in
 BlockGenerator not BM, while in the meantime Kafka itself commit the
 consumer offset, also at this point node is failed, from Kafka’s point this
 part of data is feed into Spark Streaming but actually this data is not yet
 processed, so potentially this part of data will never be processed again,
 unless you read the whole partition again.



 To solve this potential data loss problem, Spark Streaming needs to offer
 a data acknowledging mechanism, so custom Receiver can use this
 acknowledgement to do checkpoint or recovery, like Storm.



 Besides, driver failure is another story need to be carefully considered.
 So currently it is hard to make sure no data loss in Spark Streaming, still
 need to improve at some points J.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Tuesday, August 19, 2014 10:47 AM
 *To:* Wei Liu
 *Cc:* user
 *Subject:* Re: Data loss - Spark streaming and network receiver



 Hi Wei,



 On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
 wrote:

 Since our application cannot tolerate losing customer data, I am wondering
 what is the best way for us to address this issue.

 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.



 This sounds pretty much to me like the way Kafka does it. So, I am not
 saying that the stock KafkaReceiver does what you want (it may or may not),
 but it should be possible to update the offset (corresponds to sequence
 number) in Zookeeper only after data has been replicated successfully. I
 guess replace Kinesis by Kafka is not in option for you, but you may
 consider pulling Kinesis data into Kafka before processing with Spark?



 Tobias





Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread Dibyendu Bhattacharya
You can try this Kafka Spark Consumer which I recently wrote. This uses the
Low Level Kafka Consumer

https://github.com/dibbhatt/kafka-spark-consumer

Dibyendu




On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote:

 Hi,

 I am new to Apache Spark and Trying to Develop spark streaming program to  
 *stream
 data from kafka topics and output as parquet file on HDFS*.

 Please share the *sample reference* program to stream data from kafka
 topics and output as parquet file on HDFS.

 Thanks in Advance.

 Regards,

 Rafeeq S
 *(“What you do is what matters, not what you think or say or plan.” )*




Re: Low Level Kafka Consumer for Spark

2014-08-05 Thread Dibyendu Bhattacharya
Thanks Jonathan,

Yes, till non-ZK based offset management is available in Kafka, I need to
maintain the offset in ZK. And yes, both cases explicit commit is
necessary. I modified the Low Level Kafka Spark Consumer little bit to have
Receiver spawns threads for every partition of the topic and perform the
'store' operation in multiple threads. It would be good if the
receiver.store methods are made thread safe..which is not now presently .

Waiting for TD's comment on this Kafka Spark Low Level consumer.


Regards,
Dibyendu



On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Hi Yan,

 That is a good suggestion.  I believe non-Zookeeper offset management will
 be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
 September.


 https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

 That should make this fairly easy to implement, but it will still require
 explicit offset commits to avoid data loss which is different than the
 current KafkaUtils implementation.

 Jonathan





 On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals
 with the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu







Low Level Kafka Consumer for Spark

2014-08-02 Thread Dibyendu Bhattacharya
Hi,

I have implemented a Low Level Kafka Consumer for Spark Streaming using
Kafka Simple Consumer API. This API will give better control over the Kafka
offset management and recovery from failures. As the present Spark
KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
control over the offset management which is not possible in Kafka HighLevel
consumer.

This Project is available in below Repo :

https://github.com/dibbhatt/kafka-spark-consumer


I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver.
The KafkaReceiver uses low level Kafka Consumer API (implemented in
consumer.kafka packages) to fetch messages from Kafka and 'store' it in
Spark.

The logic will detect number of partitions for a topic and spawn that many
threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper
for storing the latest offset for individual partitions, which will help to
recover in case of failure. The Kafka Consumer logic is tolerant to ZK
Failures, Kafka Leader of Partition changes, Kafka broker failures,
 recovery from offset errors and other fail-over aspects.

The consumer.kafka.client.Consumer is the sample Consumer which uses this
Kafka Receivers to generate DStreams from Kafka and apply a Output
operation for every messages of the RDD.

We are planning to use this Kafka Spark Consumer to perform Near Real Time
Indexing of Kafka Messages to target Search Cluster and also Near Real Time
Aggregation using target NoSQL storage.

Kindly let me know your view. Also if this looks good, can I contribute to
Spark Streaming project.

Regards,
Dibyendu