Re: Failure handling

2017-01-25 Thread Erwan ALLAIN
I agree

We are try catching streamingcontext.awaittermination and
When exception occurs we stop the streaming context and system.exit
(50)(equal to SparkUnhandledCode)

Sounds ok.

On Tuesday, January 24, 2017, Cody Koeninger <c...@koeninger.org> wrote:

> Can you identify the error case and call System.exit ?  It'll get
> retried on another executor, but as long as that one fails the same
> way...
>
> If you can identify the error case at the time you're doing database
> interaction and just prevent data being written then, that's what I
> typically do.
>
> On Tue, Jan 24, 2017 at 7:50 AM, Erwan ALLAIN <eallain.po...@gmail.com
> <javascript:;>> wrote:
> > Hello guys,
> >
> > I have a question regarding how spark handle failure.
> >
> > I’m using kafka direct stream
> > Spark 2.0.2
> > Kafka 0.10.0.1
> >
> > Here is a snippet of code
> >
> > val stream = createDirectStream(….)
> >
> > stream
> >  .map(…)
> > .forEachRDD( doSomething)
> >
> > stream
> > .map(…)
> > .forEachRDD( doSomethingElse)
> >
> > The execution is in FIFO, so the first action ends after the second
> starts
> > so far so good.
> > However, I would like that when an error (fatal or not) occurs during the
> > execution of the first action, the streaming context is stopped
> immediately.
> > It's like the driver is not notified of the exception and launch the
> second
> > action.
> >
> > In our case, the second action is performing checkpointing in an external
> > database and we do not want to checkpoint if an error occurs before.
> > We do not want to rely on spark checkpoint as it causes issue when
> upgrading
> > application.
> >
> > Let me know if it’s not clear !
> >
> > Thanks !
> >
> > Erwan
>


Failure handling

2017-01-24 Thread Erwan ALLAIN
Hello guys,

I have a question regarding how spark handle failure.

I’m using kafka direct stream
Spark 2.0.2
Kafka 0.10.0.1

Here is a snippet of code

val stream = createDirectStream(….)

stream
 .map(…)
.forEachRDD( doSomething)

stream
.map(…)
.forEachRDD( doSomethingElse)

The execution is in FIFO, so the first action ends after the second starts
so far so good.
However, I would like that when an error (fatal or not) occurs during the
execution of the first action, the streaming context is stopped
immediately.
It's like the driver is not notified of the exception and launch the second
action.

In our case, the second action is performing checkpointing in an external
database and we do not want to checkpoint if an error occurs before.
We do not want to rely on spark checkpoint as it causes issue when
upgrading application.

Let me know if it’s not clear !

Thanks !

Erwan


How to use logback

2016-11-28 Thread Erwan ALLAIN
Hello,
In my project, I would like to use logback as logging framework ( faster,
memory footprint, etc ...)
I have managed to make it work however I had to modify the spark jars folder
- remove slf4j-log4jxx.jar
- add logback-classic / logback-core.jar

And add logback.xml in conf folder.

Is it the right way to use logback with spark ?

I try to include logback dependencies in my uber jar without success.
I just find this a bit dirty to modify spark binaries . It could be
troublesome in case of version upgrade for instance.

Thanks for your answers !


Application config management

2016-11-09 Thread Erwan ALLAIN
Hi everyone,

I d like to know what kind of configuration mechanism is used in general ?

Below is what I m going to implement but I d like to know if there is any
"standard way"

1) put configuration in hdfs
2) specify extrajavaoptions (driver and worker) with the hdfs url (
hdfs://ip:port/config)
3) in application code,
 - concatenate hdfs path + config file
 - download file
 - load with typesafe config

What do you think ?


Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Thanks for the fast answer !

I just feel annoyed and frustrated not to be able to use spark
checkpointing because I believe that there mechanism has been correctly
tested.
I'm afraid that reinventing the wheel can lead to side effects that I don't
see now ...

Anyway thanks again, I know what I have to do :)

On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger <c...@koeninger.org> wrote:

> 0.  If your processing time is regularly greater than your batch
> interval you're going to have problems anyway.  Investigate this more,
> set maxRatePerPartition, something.
> 1. That's personally what I tend to do.
> 2. Why are you relying on checkpoints if you're storing offset state
> in the database?  Just restart from the offsets in the database.  I
> think your solution of map of batchtime to offset ranges would work
> fine in that case, no?  (make sure to expire items from the map)
>
>
>
> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com>
> wrote:
> > Hi,
> >
> > I'm currently implementing an exactly once mechanism based on the
> following
> > example:
> >
> > https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/TransactionalPerBatch.scala
> >
> > the pseudo code is as follow:
> >
> > dstream.transform (store offset in a variable on driver side )
> > dstream.map
> > dstream.foreachRdd( action + save offset in db)
> >
> > this code doesn't work if the processing time is greater than batch
> interval
> > (same problem as windowed
> > (https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/Windowed.scala)
> >
> > Indeed, at each batch interval a new rdd is created and stacked, thus
> method
> > transform is called several times and update the global variable and at
> last
> > when we perform saving the offset range does not correspond to the one
> > processed.
> >
> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
> the
> > first example) instead of dstream ?
> >
> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
> case
> > of crash this map will not reflect anymore the generatedRdds (restored
> from
> > checkpoint, RDD prepared but not executed)
> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
> >   2.2)  Is there a way to retrieve offset range restored ? (transform
> method
> > is not called anymore for the checkpointed rdd)
> >   2.3) Is possible to store some context along the RDD to be serialized ?
> >
> > Lots of questions, let me kow if it's not clear !
> >
>


Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Hi,

I'm currently implementing an exactly once mechanism based on the following
example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

the pseudo code is as follow:

dstream.transform (store offset in a variable on driver side )
dstream.map
dstream.foreachRdd( action + save offset in db)

this code doesn't work if the processing time is greater than batch
interval (same problem as windowed (
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala
)

Indeed, at each batch interval a new rdd is created and stacked, thus
method transform is called several times and update the global variable and
at last when we perform saving the offset range does not correspond to the
one processed.

1) Do I need to work at the RDD level (inside a big forEachRDD like in the
first example) instead of dstream ?

2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
of crash this map will not reflect anymore the generatedRdds (restored from
checkpoint, RDD prepared but not executed)
  2.1 ) Do I need to store this map elsewhere (cassandra) ?
  2.2)  Is there a way to retrieve offset range restored ? (transform
method is not called anymore for the checkpointed rdd)
  2.3) Is possible to store some context along the RDD to be serialized ?

Lots of questions, let me kow if it's not clear !


Slow Shuffle Operation on Empty Batch

2016-09-26 Thread Erwan ALLAIN
Hi

I'm working with
- Kafka 0.8.2
- Spark Streaming (2.0) direct input stream.
- cassandra 3.0

My batch interval is 1s.

When I use some map, filter even saveToCassandra functions, the processing
time is around 50ms on empty batches
 => This is fine.

As soon as I use some reduceByKey, the processing time is increasing rapidly
between 3 and 4s for 3 calls of reduceByKey on empty batches.
=> Not Good

I've found a workaround by using a foreachRDD on DStream and check if rdd
is empty before executing the reduceByKey but I find this quite ugly.

Do I need to check if RDD is empty on all shuffle operation ?

Thanks for your lights


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Cody, you're right that was an example. Target architecture would be 3 DCs
:) Good point on ZK, I'll have to check that.

About Spark, both instances will run at the same time but on different
topics. That would be quite useless to have to 2DCs working on the same set
of data.
I just want, in case of crash, that the healthy spark works on all topics
(retrieve dead spark load).

Does it seem an awkward design ?

On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Maybe I'm missing something, but I don't see how you get a quorum in only
> 2 datacenters (without splitbrain problem, etc).  I also don't know how
> well ZK will work cross-datacenter.
>
> As far as the spark side of things goes, if it's idempotent, why not just
> run both instances all the time.
>
>
>
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN <eallain.po...@gmail.com>
> wrote:
>
>> I'm describing a disaster recovery but it can be used to make one
>> datacenter offline for upgrade for instance.
>>
>> From my point of view when DC2 crashes:
>>
>> *On Kafka side:*
>> - kafka cluster will lose one or more broker (partition leader and
>> replica)
>> - partition leader lost will be reelected in the remaining healthy DC
>>
>> => if the number of in-sync replicas are above the minimum threshold,
>> kafka should be operational
>>
>> *On downstream datastore side (say Cassandra for instance):*
>> - deploy accross the 2 DCs in (QUORUM / QUORUM)
>> - idempotent write
>>
>> => it should be ok (depends on replication factor)
>>
>> *On Spark*:
>> - treatment should be idempotent, it will allow us to restart from the
>> last commited offset
>>
>> I understand that starting up a post crash job would work.
>>
>> Question is: how can we detect when DC2 crashes to start a new job ?
>>
>> dynamic topic partition (at each kafkaRDD creation for instance) + topic
>> subscription may be the answer ?
>>
>> I appreciate your effort.
>>
>> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin <jasonnerot...@gmail.com>
>> wrote:
>>
>>> It the main concern uptime or disaster recovery?
>>>
>>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>>
>>> I think the bigger question is what happens to Kafka and your downstream
>>> data store when DC2 crashes.
>>>
>>> From a Spark point of view, starting up a post-crash job in a new data
>>> center isn't really different from starting up a post-crash job in the
>>> original data center.
>>>
>>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC
>>>> case.
>>>>
>>>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>>>> more spark cluster distinct.
>>>>
>>>> Let's say we have the following DCs configuration in a nominal case.
>>>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>>>
>>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>>> DC 1 Master 1.1
>>>>
>>>> Worker 1.1 my_group P1
>>>> Worker 1.2 my_group P2
>>>> DC 2 Master 2.1
>>>>
>>>> Worker 2.1 my_group P3
>>>> Worker 2.2 my_group P4
>>>> I would like, in case of DC crash, a rebalancing of partition on the
>>>> healthy DC, something as follow
>>>>
>>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>>> DC 1 Master 1.1
>>>>
>>>> Worker 1.1 my_group P1*, P3*
>>>> Worker 1.2 my_group P2*, P4*
>>>> DC 2 Master 2.1
>>>>
>>>> Worker 2.1 my_group P3
>>>> Worker 2.2 my_group P4
>>>>
>>>> I would like to know if it's possible:
>>>> - using consumer group ?
>>>> - using direct approach ? I prefer this one as I don't want to activate
>>>> WAL.
>>>>
>>>> Hope the explanation is better !
>>>>
>>>>
>>>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The current direct stream only handles exactly the partitions
>>>>> specified at startup.  You'd have to restart the job if you changed
>>>>> partitions.
>>>>>
>>>>> https://issues.apache.

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
I'm describing a disaster recovery but it can be used to make one
datacenter offline for upgrade for instance.

>From my point of view when DC2 crashes:

*On Kafka side:*
- kafka cluster will lose one or more broker (partition leader and replica)
- partition leader lost will be reelected in the remaining healthy DC

=> if the number of in-sync replicas are above the minimum threshold, kafka
should be operational

*On downstream datastore side (say Cassandra for instance):*
- deploy accross the 2 DCs in (QUORUM / QUORUM)
- idempotent write

=> it should be ok (depends on replication factor)

*On Spark*:
- treatment should be idempotent, it will allow us to restart from the last
commited offset

I understand that starting up a post crash job would work.

Question is: how can we detect when DC2 crashes to start a new job ?

dynamic topic partition (at each kafkaRDD creation for instance) + topic
subscription may be the answer ?

I appreciate your effort.

On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin <jasonnerot...@gmail.com>
wrote:

> It the main concern uptime or disaster recovery?
>
> On Apr 19, 2016, at 9:12 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
> I think the bigger question is what happens to Kafka and your downstream
> data store when DC2 crashes.
>
> From a Spark point of view, starting up a post-crash job in a new data
> center isn't really different from starting up a post-crash job in the
> original data center.
>
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com>
> wrote:
>
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>>
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>> more spark cluster distinct.
>>
>> Let's say we have the following DCs configuration in a nominal case.
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1
>> Worker 1.2 my_group P2
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>> I would like, in case of DC crash, a rebalancing of partition on the
>> healthy DC, something as follow
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1*, P3*
>> Worker 1.2 my_group P2*, P4*
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>>
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate
>> WAL.
>>
>> Hope the explanation is better !
>>
>>
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> The current direct stream only handles exactly the partitions
>>> specified at startup.  You'd have to restart the job if you changed
>>> partitions.
>>>
>>> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
>>> towards using the kafka 0.10 consumer, which would allow for dynamic
>>> topicparittions
>>>
>>> Regarding your multi-DC questions, I'm not really clear on what you're
>>> saying.
>>>
>>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN <eallain.po...@gmail.com>
>>> wrote:
>>> > Hello,
>>> >
>>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>>> > repartition).
>>> > The aims are
>>> > - preventing DC crash: using kafka resiliency and consumer group
>>> mechanism
>>> > (or else ?)
>>> > - keeping consistent offset among replica (vs mirror maker,which does
>>> not
>>> > keep offset)
>>> >
>>> > I have several questions
>>> >
>>> > 1) Dynamic repartition (one or 2 DC)
>>> >
>>> > I'm using KafkaDirectStream which map one partition kafka with one
>>> spark. Is
>>> > it possible to handle new or removed partition ?
>>> > In the compute method, it looks like we are always using the
>>> currentOffset
>>> > map to query the next batch and therefore it's always the same number
>>> of
>>> > partition ? Can we request metadata at each batch ?
>>> >
>>> > 2) Multi DC Spark
>>> >
>>> > Using Direct approach, a way to achieve this would be
>>> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
>>> > - only one is reading the partition (Check every x interval, "lock"
>>> stored
>>> > in cassandra for instance)
>>> >
>>> > => not sure if it works just an idea
>>> >
>>> > Using Consumer Group
>>> > - CommitOffset manually at the end of the batch
>>> >
>>> > => Does spark handle partition rebalancing ?
>>> >
>>> > I'd appreciate any ideas ! Let me know if it's not clear.
>>> >
>>> > Erwan
>>> >
>>> >
>>>
>>
>>
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.

As I mentionned before, I'm planning to use one kafka cluster and 2 or more
spark cluster distinct.

Let's say we have the following DCs configuration in a nominal case.
Kafka partitions are consumed uniformly by the 2 datacenters.

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1
Worker 1.2 my_group P2
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4
I would like, in case of DC crash, a rebalancing of partition on the healthy
DC, something as follow

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1*, P3*
Worker 1.2 my_group P2*, P4*
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4

I would like to know if it's possible:
- using consumer group ?
- using direct approach ? I prefer this one as I don't want to activate WAL.

Hope the explanation is better !


On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger <c...@koeninger.org> wrote:

> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
>
> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
>
> Regarding your multi-DC questions, I'm not really clear on what you're
> saying.
>
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN <eallain.po...@gmail.com>
> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group
> mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one
> spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the
> currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock"
> stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
>


Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-18 Thread Erwan ALLAIN
Hello,

I'm currently designing a solution where 2 distinct clusters Spark (2
datacenters) share the same Kafka (Kafka rack aware or manual broker
repartition).
The aims are
- preventing DC crash: using kafka resiliency and consumer group mechanism
(or else ?)
- keeping consistent offset among replica (vs mirror maker,which does not
keep offset)

I have several questions

1) Dynamic repartition (one or 2 DC)

I'm using KafkaDirectStream which map one partition kafka with one spark.
Is it possible to handle new or removed partition ?
In the compute method, it looks like we are always using the currentOffset
map to query the next batch and therefore it's always the same number of
partition ? Can we request metadata at each batch ?

2) Multi DC Spark

*Using Direct approach,* a way to achieve this would be
- to "assign" (kafka 0.9 term) all topics to the 2 sparks
- only one is reading the partition (Check every x interval, "lock" stored
in cassandra for instance)

=> not sure if it works just an idea

*Using Consumer Group*
- CommitOffset manually at the end of the batch

=> Does spark handle partition rebalancing ?

I'd appreciate any ideas ! Let me know if it's not clear.

Erwan


Re: Join and HashPartitioner question

2015-11-16 Thread Erwan ALLAIN
You may need to persist r1 after partitionBy call. second join will be more
efficient.

On Mon, Nov 16, 2015 at 2:48 PM, Rishi Mishra  wrote:

> AFAIK and can see in the code both of them should behave same.
>
> On Sat, Nov 14, 2015 at 2:10 AM, Alexander Pivovarov  > wrote:
>
>> Hi Everyone
>>
>> Is there any difference in performance btw the following two joins?
>>
>>
>> val r1: RDD[(String, String]) = ???
>> val r2: RDD[(String, String]) = ???
>>
>> val partNum = 80
>> val partitioner = new HashPartitioner(partNum)
>>
>> // Join 1
>> val res1 = r1.partitionBy(partitioner).join(r2.partitionBy(partitioner))
>>
>> // Join 2
>> val res2 = r1.join(r2, partNum)
>>
>>
>>
>
>
> --
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: Saving offset while reading from kafka

2015-10-23 Thread Erwan ALLAIN
Have a look at this: https://github.com/koeninger/kafka-exactly-once

especially:
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala

On Fri, Oct 23, 2015 at 5:07 AM, Ramkumar V  wrote:

> Hi,
>
> I had written spark streaming application using kafka stream and its
> writing to hdfs for every hour(batch time). I would like to know how to get
> offset or commit offset of kafka stream while writing to hdfs so that if
> there is any issue or redeployment, i'll start from the point where i did a
> previous successful commit offset. I want to store offset in external db or
> something like that, not in zookeeper. if i want to resume kafka stream
> from the particular offset, how to resume from the particular offset in
> spark ?
>
> *Thanks*,
> 
>
>


Re: Best practices to handle corrupted records

2015-10-16 Thread Erwan ALLAIN
Either[FailureResult[T], Either[SuccessWithWarnings[T], SuccessResult[T]]]
maybe ?


On Thu, Oct 15, 2015 at 5:31 PM, Antonio Murgia <
antonio.murg...@studio.unibo.it> wrote:

> 'Either' does not cover the case where the outcome was successful but
> generated warnings. I already looked into it and also at 'Try' from which I
> got inspired. Thanks for pointing it out anyway!
>
> #A.M.
>
> Il giorno 15 ott 2015, alle ore 16:19, Erwan ALLAIN <
> eallain.po...@gmail.com> ha scritto:
>
> What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?
>
>
> On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu <roberto.con...@gmail.com>
> wrote:
>
>> I came to a similar solution to a similar problem. I deal with a lot of
>> CSV files from many different sources and they are often malformed.
>> HOwever, I just have success/failure. Maybe you should  make
>> SuccessWithWarnings a subclass of success, or getting rid of it altogether
>> making the warnings optional.
>> I was thinking of making this cleaning/conforming library open source if
>> you're interested.
>>
>> R.
>>
>> 2015-10-15 5:28 GMT-07:00 Antonio Murgia <antonio.murg...@studio.unibo.it
>> >:
>>
>>> Hello,
>>> I looked around on the web and I couldn’t find any way to deal in a
>>> structured way with malformed/faulty records during computation. All I was
>>> able to find was the flatMap/Some/None technique + logging.
>>> I’m facing this problem because I have a processing algorithm that
>>> extracts more than one value from each record, but can fail in extracting
>>> one of those multiple values, and I want to keep track of them. Logging is
>>> not feasible because this “warning” happens so frequently that the logs
>>> would become overwhelming and impossibile to read.
>>> Since I have 3 different possible outcomes from my processing I modeled
>>> it with this class hierarchy:
>>> That holds result and/or warnings.
>>> Since Result implements Traversable it can be used in a flatMap,
>>> discarding all warnings and failure results, in the other hand, if we want
>>> to keep track of warnings, we can elaborate them and output them if we need.
>>>
>>> Kind Regards
>>> #A.M.
>>>
>>
>>
>>
>> --
>> --
>> "Good judgment comes from experience.
>> Experience comes from bad judgment"
>> --
>>
>
>


Re: Best practices to handle corrupted records

2015-10-15 Thread Erwan ALLAIN
What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?


On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu 
wrote:

> I came to a similar solution to a similar problem. I deal with a lot of
> CSV files from many different sources and they are often malformed.
> HOwever, I just have success/failure. Maybe you should  make
> SuccessWithWarnings a subclass of success, or getting rid of it altogether
> making the warnings optional.
> I was thinking of making this cleaning/conforming library open source if
> you're interested.
>
> R.
>
> 2015-10-15 5:28 GMT-07:00 Antonio Murgia 
> :
>
>> Hello,
>> I looked around on the web and I couldn’t find any way to deal in a
>> structured way with malformed/faulty records during computation. All I was
>> able to find was the flatMap/Some/None technique + logging.
>> I’m facing this problem because I have a processing algorithm that
>> extracts more than one value from each record, but can fail in extracting
>> one of those multiple values, and I want to keep track of them. Logging is
>> not feasible because this “warning” happens so frequently that the logs
>> would become overwhelming and impossibile to read.
>> Since I have 3 different possible outcomes from my processing I modeled
>> it with this class hierarchy:
>> That holds result and/or warnings.
>> Since Result implements Traversable it can be used in a flatMap,
>> discarding all warnings and failure results, in the other hand, if we want
>> to keep track of warnings, we can elaborate them and output them if we need.
>>
>> Kind Regards
>> #A.M.
>>
>
>
>
> --
> --
> "Good judgment comes from experience.
> Experience comes from bad judgment"
> --
>


Re: does KafkaCluster can be public ?

2015-10-07 Thread Erwan ALLAIN
Thanks guys !

On Wed, Oct 7, 2015 at 1:41 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Sure no prob.
>
> On Tue, Oct 6, 2015 at 6:35 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Given the interest, I am also inclining towards making it a public
>> developer API. Maybe even experimental. Cody, mind submitting a patch?
>>
>>
>> On Tue, Oct 6, 2015 at 7:45 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> For what it's worth, I also use this class in an app, but it happens
>>> to be from Java code where it acts as if it's public. So no problem
>>> for my use case, but I suppose, another small vote for the usefulness
>>> of this class to the caller. I end up using getLatestLeaderOffsets to
>>> figure out how to initialize initial offsets.
>>>
>>> On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> > I personally think KafkaCluster (or the equivalent) should be made
>>> public.
>>> > When I'm deploying spark I just sed out the private[spark] and rebuild.
>>> >
>>> > There's a general reluctance to make things public due to backwards
>>> > compatibility, but if enough people ask for it... ?
>>> >
>>> > On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney <jcove...@gmail.com>
>>> wrote:
>>> >>
>>> >> You can put a class in the org.apache.spark namespace to access
>>> anything
>>> >> that is private[spark]. You can then make enrichments there to access
>>> >> whatever you need. Just beware upgrade pain :)
>>> >>
>>> >>
>>> >> El martes, 6 de octubre de 2015, Erwan ALLAIN <
>>> eallain.po...@gmail.com>
>>> >> escribió:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>> I'm currently testing spark streaming with kafka.
>>> >>> I'm creating DirectStream with KafkaUtils and everything's fine.
>>> However
>>> >>> I would like to use the signature where I can specify my own message
>>> handler
>>> >>> (to play with partition and offset). In this case, I need to manage
>>> >>> offset/partition by myself to fill fromOffsets argument.
>>> >>> I have found a Jira on this usecase
>>> >>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been
>>> closed
>>> >>> telling that it's too specific.
>>> >>> I'm aware that it can be done using kafka api (TopicMetaDataRequest
>>> and
>>> >>> OffsetRequest) but what I have to do is almost the same as the
>>> KafkaCluster
>>> >>> which is private.
>>> >>>
>>> >>> is it possible to :
>>> >>>  - add another signature in KafkaUtils ?
>>> >>>  - make KafkaCluster public ?
>>> >>>
>>> >>> or do you have any other srmart solution where I don't need to
>>> copy/paste
>>> >>> KafkaCluster ?
>>> >>>
>>> >>> Thanks.
>>> >>>
>>> >>> Regards,
>>> >>> Erwan ALLAIN
>>> >
>>> >
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


does KafkaCluster can be public ?

2015-10-06 Thread Erwan ALLAIN
Hello,

I'm currently testing spark streaming with kafka.
I'm creating DirectStream with KafkaUtils and everything's fine. However I
would like to use the signature where I can specify my own message handler
(to play with partition and offset). In this case, I need to manage
offset/partition by myself to fill fromOffsets argument.
I have found a Jira on this usecase
https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
telling that it's too specific.
I'm aware that it can be done using kafka api (TopicMetaDataRequest and
OffsetRequest) but what I have to do is almost the same as the KafkaCluster
which is private.

is it possible to :
 - add another signature in KafkaUtils ?
 - make KafkaCluster public ?

or do you have any other srmart solution where I don't need to copy/paste
KafkaCluster ?

Thanks.

Regards,
Erwan ALLAIN