Re: can spark take advantage of ordered data?

2017-03-10 Thread sourabh chaki
My use case is also quite similar. I have 2 feeds. One 3TB and another
100GB. Both the feeds are generated by hadoop reduce operation and
partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
100GB file has 200 partitions.

Now when I do a join between these two feeds using spark, spark shuffles
both the RDDS and it takes long time to complete. Can we do something so
that spark can recognise the existing partitions of 3TB feed and shuffles
only 200GB feed?
It can be mapside scan for bigger RDD and shuffle read from smaller RDD?

I have looked at spark-sorted project, but that project does not utilise
the pre-existing partitions in the feed.
Any pointer will be helpful.

Thanks
Sourabh

On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid <iras...@cloudera.com> wrote:

> Hi Jonathan,
>
> you might be interested in https://issues.apache.org/
> jira/browse/SPARK-3655 (not yet available) and https://github.com/
> tresata/spark-sorted (not part of spark, but it is available right now).
> Hopefully thats what you are looking for.  To the best of my knowledge that
> covers what is available now / what is being worked on.
>
> Imran
>
> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney <jcove...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am wondering if spark already has support for optimizations on sorted
>> data and/or if such support could be added (I am comfortable dropping to a
>> lower level if necessary to implement this, but I'm not sure if it is
>> possible at all).
>>
>> Context: we have a number of data sets which are essentially already
>> sorted on a key. With our current systems, we can take advantage of this to
>> do a lot of analysis in a very efficient fashion...merges and joins, for
>> example, can be done very efficiently, as can folds on a secondary key and
>> so on.
>>
>> I was wondering if spark would be a fit for implementing these sorts of
>> optimizations? Obviously it is sort of a niche case, but would this be
>> achievable? Any pointers on where I should look?
>>
>
>


Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-02 Thread Sourabh Chandak
Thanks Cody, will try to do some estimation.

Thanks Nicolae, will try out this config.

Thanks,
Sourabh

On Thu, Oct 1, 2015 at 11:01 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> Set 10ms and spark.streaming.backpressure.enabled=true
>
>
> This should automatically delay the next batch until the current one is
> processed, or at least create that balance over a few batches/periods
> between the consume/process rate vs ingestion rate.
>
>
> Nicu
>
> --
> *From:* Cody Koeninger <c...@koeninger.org>
> *Sent:* Thursday, October 1, 2015 11:46 PM
> *To:* Sourabh Chandak
> *Cc:* user
> *Subject:* Re: spark.streaming.kafka.maxRatePerPartition for direct stream
>
> That depends on your job, your cluster resources, the number of seconds
> per batch...
>
> You'll need to do some empirical work to figure out how many messages per
> batch a given executor can handle.  Divide that by the number of seconds
> per batch.
>
>
>
> On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak <sourabh3...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am writing a spark streaming job using the direct stream method for
>> kafka and wanted to handle the case of checkpoint failure when we'll have
>> to reprocess the entire data from starting. By default for every new
>> checkpoint it tries to load everything from each partition and that takes a
>> lot of time for processing. After some searching found out that there
>> exists a config spark.streaming.kafka.maxRatePerPartition which can be used
>> to tackle this. My question is what will be a suitable range for this
>> config if we have ~12 million messages in kafka with maximum message size
>> ~10 MB.
>>
>> Thanks,
>> Sourabh
>>
>
>


Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Hi,

I have a receiverless kafka streaming job which was started yesterday
evening and was running fine till 4 PM today. Suddenly post that writing of
checkpoint has slowed down and it is now not able to catch up with the
incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
checkpointing. Spark streaming is done using a backported code.

Running nodetool shows that the Read latency of the cfs keyspace is ~8.5 ms.

Can someone please help me resolve this?

Thanks,
Sourabh


Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
I can see the entries processed in the table very fast but after that it
takes a long time for the checkpoint update.

Haven't tried other methods of checkpointing yet, we are using DSE on Azure.

Thanks,
Sourabh

On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Why are you sure it's checkpointing speed?
>
> Have you compared it against checkpointing to hdfs, s3, or local disk?
>
> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak <sourabh3...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a receiverless kafka streaming job which was started yesterday
>> evening and was running fine till 4 PM today. Suddenly post that writing of
>> checkpoint has slowed down and it is now not able to catch up with the
>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
>> checkpointing. Spark streaming is done using a backported code.
>>
>> Running nodetool shows that the Read latency of the cfs keyspace is ~8.5
>> ms.
>>
>> Can someone please help me resolve this?
>>
>> Thanks,
>> Sourabh
>>
>>
>


Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Tried using local checkpointing as well, and even that becomes slow after
sometime. Any idea what can be wrong?

Thanks,
Sourabh

On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak <sourabh3...@gmail.com>
wrote:

> I can see the entries processed in the table very fast but after that it
> takes a long time for the checkpoint update.
>
> Haven't tried other methods of checkpointing yet, we are using DSE on
> Azure.
>
> Thanks,
> Sourabh
>
> On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Why are you sure it's checkpointing speed?
>>
>> Have you compared it against checkpointing to hdfs, s3, or local disk?
>>
>> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak <sourabh3...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a receiverless kafka streaming job which was started yesterday
>>> evening and was running fine till 4 PM today. Suddenly post that writing of
>>> checkpoint has slowed down and it is now not able to catch up with the
>>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
>>> checkpointing. Spark streaming is done using a backported code.
>>>
>>> Running nodetool shows that the Read latency of the cfs keyspace is ~8.5
>>> ms.
>>>
>>> Can someone please help me resolve this?
>>>
>>> Thanks,
>>> Sourabh
>>>
>>>
>>
>


Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Offset checkpoints (partition, offset) when using kafka direct streaming
approach

On Friday, October 2, 2015, Tathagata Das <t...@databricks.com> wrote:

> Which checkpointing are you talking about? DStream checkpoints (which
> saves the DAG of DStreams, that is, only metadata), or RDD checkpointing
> (which saves the actual intermediate RDD data)
>
> TD
>
> On Fri, Oct 2, 2015 at 2:56 PM, Sourabh Chandak <sourabh3...@gmail.com
> <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote:
>
>> Tried using local checkpointing as well, and even that becomes slow after
>> sometime. Any idea what can be wrong?
>>
>> Thanks,
>> Sourabh
>>
>> On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak <sourabh3...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote:
>>
>>> I can see the entries processed in the table very fast but after that it
>>> takes a long time for the checkpoint update.
>>>
>>> Haven't tried other methods of checkpointing yet, we are using DSE on
>>> Azure.
>>>
>>> Thanks,
>>> Sourabh
>>>
>>> On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger <c...@koeninger.org
>>> <javascript:_e(%7B%7D,'cvml','c...@koeninger.org');>> wrote:
>>>
>>>> Why are you sure it's checkpointing speed?
>>>>
>>>> Have you compared it against checkpointing to hdfs, s3, or local disk?
>>>>
>>>> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak <sourabh3...@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a receiverless kafka streaming job which was started yesterday
>>>>> evening and was running fine till 4 PM today. Suddenly post that writing 
>>>>> of
>>>>> checkpoint has slowed down and it is now not able to catch up with the
>>>>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
>>>>> checkpointing. Spark streaming is done using a backported code.
>>>>>
>>>>> Running nodetool shows that the Read latency of the cfs keyspace is
>>>>> ~8.5 ms.
>>>>>
>>>>> Can someone please help me resolve this?
>>>>>
>>>>> Thanks,
>>>>> Sourabh
>>>>>
>>>>>
>>>>
>>>
>>
>


spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-01 Thread Sourabh Chandak
Hi,

I am writing a spark streaming job using the direct stream method for kafka
and wanted to handle the case of checkpoint failure when we'll have to
reprocess the entire data from starting. By default for every new
checkpoint it tries to load everything from each partition and that takes a
lot of time for processing. After some searching found out that there
exists a config spark.streaming.kafka.maxRatePerPartition which can be used
to tackle this. My question is what will be a suitable range for this
config if we have ~12 million messages in kafka with maximum message size
~10 MB.

Thanks,
Sourabh


Re: Adding / Removing worker nodes for Spark Streaming

2015-09-28 Thread Sourabh Chandak
I also have the same use case as Augustus, and have some basic questions
about recovery from checkpoint. I have a 10 node Kafka cluster and a 30
node Spark cluster running streaming job, how is the (topic, partition)
data handled in checkpointing. The scenario I want to understand is, in
case of node failure how will a new node know the checkpoint of the failed
node?
The amount of data we have is huge and we can't run from the smallest
offset.

Thanks,
Sourabh

On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong <augus...@branchmetrics.io>
wrote:

> Got it, thank you!
>
>
> On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Losing worker nodes without stopping is definitely possible.  I haven't
>> had much success adding workers to a running job, but I also haven't spent
>> much time on it.
>>
>> If you're restarting with the same jar, you should be able to recover
>> from checkpoint without losing data (usual caveats apply, e.g. you need
>> enough kafka retention).  Make sure to test it though, as the code paths
>> taken during recovery from checkpoint are not the same as on initial
>> startup, and you can run into unexpected issues (e.g. authentication).
>>
>> On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong <augus...@branchmetrics.io
>> > wrote:
>>
>>> Hey all,
>>>
>>> I'm evaluating using Spark Streaming with Kafka direct streaming, and I
>>> have a couple of questions:
>>>
>>> 1.  Would it be possible to add / remove worker nodes without stopping
>>> and restarting the spark streaming driver?
>>>
>>> 2.  I understand that we can enable checkpointing to recover from node
>>> failures, and that it doesn't work across code changes.  What about in the
>>> event that worker nodes failed due to load -> we added more worker nodes ->
>>> restart Spark Streaming?  Would this incur data loss as well?
>>>
>>>
>>> Best,
>>> Augustus
>>>
>>> --
>>> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
>>> Hong*
>>>  Data Analytics | Branch Metrics
>>>  m 650-391-3369 | e augus...@branch.io
>>>
>>
>>
>
>
> --
> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Here is the code snippet, starting line 365 in KafkaCluster.scala:

type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
  result.fold(
errs => throw new SparkException(errs.mkString("Throwing this errir\n")),
ok => ok
  )
}



On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak <sourabh3...@gmail.com>
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
> at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3...@gmail.com>
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>>
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>> at
>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>> at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>> at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>> at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>> at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>> at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>> at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>> at
>>>> org.apache.spark.stre

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
I was able to get pass this issue. I was pointing the SSL port whereas
SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
am getting the following error:

Exception in thread "main" org.apache.spark.SparkException:
java.nio.BufferUnderflowException
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Sourabh

On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That looks like the OOM is in the driver, when getting partition metadata
> to create the direct stream.  In that case, executor memory allocation
> doesn't matter.
>
> Allocate more driver memory, or put a profiler on it to see what's taking
> up heap.
>
>
>
> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3...@gmail.com>
> wrote:
>
>> Adding Cody and Sriharsha
>>
>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>> am trying to run a spark streaming job to consume data form my broker, but
>>> I am getting the following error:
>>>
>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>> 352518400
>>> java.lang.OutOfMemoryError: Java heap space
>>> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>> at
>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>> at
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>> at
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>> at
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>> at
>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>> at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>> at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>> at org.apache.spark.streaming.kafka.KafkaCluster.org
>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUti

ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Hi,

I have ported receiver less spark streaming for kafka to Spark 1.2 and am
trying to run a spark streaming job to consume data form my broker, but I
am getting the following error:

15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



I have tried allocating 100G of memory with 1 executor but it is still
failing.

Spark version: 1.2.2
Kafka version ported: 0.8.2
Kafka server version: trunk version with SSL enabled

Can someone please help me debug this.

Thanks,
Sourabh


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Adding Cody and Sriharsha

On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com>
wrote:

> Hi,
>
> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
> trying to run a spark streaming job to consume data form my broker, but I
> am getting the following error:
>
> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
> at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> I have tried allocating 100G of memory with 1 executor but it is still
> failing.
>
> Spark version: 1.2.2
> Kafka version ported: 0.8.2
> Kafka server version: trunk version with SSL enabled
>
> Can someone please help me debug this.
>
> Thanks,
> Sourabh
>


Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Sourabh Chandak
Can we use the existing kafka spark streaming jar to connect to a kafka
server running in SSL mode?

We are fine with non SSL consumer as our kafka cluster and spark cluster
are in the same network


Thanks,
Sourabh

On Fri, Aug 28, 2015 at 12:03 PM, Gwen Shapira g...@confluent.io wrote:

 I can't speak for the Spark Community, but checking their code,
 DirectKafkaStream and KafkaRDD use the SimpleConsumer API:


 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

 On Fri, Aug 28, 2015 at 11:32 AM, Cassa L lcas...@gmail.com wrote:

  Hi I am using below Spark jars with Direct Stream API.
spark-streaming-kafka_2.10
 
  When I look at its pom.xml, Kafka libraries that its pulling in is
 groupIdorg.apache.kafka/groupId
 artifactIdkafka_${scala.binary.version}/artifactId
 version0.8.2.1/version
 
 
  I believe this DirectStream API uses SimpleConsumer API. Can someone from
  Spark community confirm too?
 
  Thanks,
  LCassa.
 
  On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani 
 ka...@harsha.io
  wrote:
 
   SSL is supported for new producer and consumer api and old api (simple
   consumer and high-level consumer) is not supported.
   I think spark uses simple consumer? if so its not supported.
  
   Thanks,
   Harsha
  
  
   On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:
  
   Hi,
   I was going through SSL setup of Kafka.
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
   However, I am also using Spark-Kafka streaming to read data from Kafka.
  Is
   there a way to activate SSL for spark streaming API or not possible at
   all?
  
   Thanks,
   LCassa
  
  
 



Re: Reliable Streaming Receiver

2015-08-05 Thread Sourabh Chandak
Thanks Tathagata. I tried that but BlockGenerator internally uses
SystemClock which is again private.

We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
version. Is it possible to use the same code as a separate API with 1.2?

Thanks,
Sourabh

On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote:

  You could very easily strip out the BlockGenerator code from the Spark
 source code and use it directly in the same way the Reliable Kafka Receiver
 uses it. BTW, you should know that we will be deprecating the receiver
 based approach for the Direct Kafka approach. That is quite flexible, can
 give exactly-once guarantee without WAL, and is more robust and performant.
 Consider using it.


 On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com
 wrote:

 Hi,

 I am trying to replicate the Kafka Streaming Receiver for a custom
 version of Kafka and want to create a Reliable receiver. The current
 implementation uses BlockGenerator which is a private class inside Spark
 streaming hence I can't use that in my code. Can someone help me with some
 resources to tackle this issue?



 Thanks,
 Sourabh





Reliable Streaming Receiver

2015-08-05 Thread Sourabh Chandak
Hi,

I am trying to replicate the Kafka Streaming Receiver for a custom version
of Kafka and want to create a Reliable receiver. The current implementation
uses BlockGenerator which is a private class inside Spark streaming hence I
can't use that in my code. Can someone help me with some resources to
tackle this issue?



Thanks,
Sourabh


Re: JAVA_HOME problem

2015-04-28 Thread sourabh chaki
I was able to solve this problem hard coding the JAVA_HOME inside
org.apache.spark.deploy.yarn.Client.scala class.




*val commands = prefixEnv ++ Seq(--
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) +
/bin/java, -server++ /usr/java/jdk1.7.0_51/bin/java, -server)*

Somehow {{JAVA_HOME}}  was not getting resolved in the node of yarn
container. This change has fixed the problem. Now I am getting a new
error.

*Container: container_1430123808466_36297_02_01
===
LogType: stderr
LogLength: 87
Log Contents:
Error: Could not find or load main class
org.apache.spark.deploy.yarn.ExecutorLauncher

LogType: stdout
LogLength: 0
Log Contents:*

Looks like now classpath variables are not resolved in yarn node. I
have mapreduce jobs running in the same cluster  working without any
problem. Any pointer why this could happen?


Thanks

Sourabh


On Fri, Apr 24, 2015 at 3:52 PM, sourabh chaki chaki.sour...@gmail.com
wrote:

 Yes Akhil. This is the same issue. I have updated my comment in that
 ticket.

 Thanks
 Sourabh

 On Fri, Apr 24, 2015 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Isn't this related to this
 https://issues.apache.org/jira/browse/SPARK-6681

 Thanks
 Best Regards

 On Fri, Apr 24, 2015 at 11:40 AM, sourabh chaki chaki.sour...@gmail.com
 wrote:

 I am also facing the same problem with spark 1.3.0 and yarn-client and
 yarn-cluster mode. Launching yarn container failed and this is the error in
 stderr:

 Container: container_1429709079342_65869_01_01

 ===
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:

 I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
 grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
 export JAVA_HOME=/usr/java/default
 export PATH=$PATH:$JAVA_HOME/bin/java

 grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
 export JAVA_HOME=/usr/java/default

 I could see another thread for the same problem but I dont see any
 solution.

 http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
  Any pointer will be helpful.

 Thanks
 Sourabh


 On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 917361...@qq.com wrote:

 spark 1.3.0


 spark@pc-zjqdyyn1:~ tail /etc/profile
 export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
 export PATH=$PATH:$JAVA_HOME/bin

 #
 # End of /etc/profile
 #‍


 But ERROR LOG

 Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
 
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:‍







Re: JAVA_HOME problem

2015-04-24 Thread sourabh chaki
Yes Akhil. This is the same issue. I have updated my comment in that ticket.

Thanks
Sourabh

On Fri, Apr 24, 2015 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Isn't this related to this
 https://issues.apache.org/jira/browse/SPARK-6681

 Thanks
 Best Regards

 On Fri, Apr 24, 2015 at 11:40 AM, sourabh chaki chaki.sour...@gmail.com
 wrote:

 I am also facing the same problem with spark 1.3.0 and yarn-client and
 yarn-cluster mode. Launching yarn container failed and this is the error in
 stderr:

 Container: container_1429709079342_65869_01_01

 ===
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:

 I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
 grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
 export JAVA_HOME=/usr/java/default
 export PATH=$PATH:$JAVA_HOME/bin/java

 grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
 export JAVA_HOME=/usr/java/default

 I could see another thread for the same problem but I dont see any
 solution.

 http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
  Any pointer will be helpful.

 Thanks
 Sourabh


 On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 917361...@qq.com wrote:

 spark 1.3.0


 spark@pc-zjqdyyn1:~ tail /etc/profile
 export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
 export PATH=$PATH:$JAVA_HOME/bin

 #
 # End of /etc/profile
 #‍


 But ERROR LOG

 Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
 
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:‍






Re: JAVA_HOME problem

2015-04-24 Thread sourabh chaki
I am also facing the same problem with spark 1.3.0 and yarn-client and
yarn-cluster mode. Launching yarn container failed and this is the error in
stderr:

Container: container_1429709079342_65869_01_01
===
LogType: stderr
LogLength: 61
Log Contents:
/bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

LogType: stdout
LogLength: 0
Log Contents:

I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
export JAVA_HOME=/usr/java/default
export PATH=$PATH:$JAVA_HOME/bin/java

grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
export JAVA_HOME=/usr/java/default

I could see another thread for the same problem but I dont see any
solution.
http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
 Any pointer will be helpful.

Thanks
Sourabh


On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 917361...@qq.com wrote:

 spark 1.3.0


 spark@pc-zjqdyyn1:~ tail /etc/profile
 export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
 export PATH=$PATH:$JAVA_HOME/bin

 #
 # End of /etc/profile
 #‍


 But ERROR LOG

 Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
 
 LogType: stderr
 LogLength: 61
 Log Contents:
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 LogType: stdout
 LogLength: 0
 Log Contents:‍



Re: train many decision tress with a single spark job

2015-01-13 Thread sourabh chaki
Hi Josh,

I was trying out decision tree ensemble using bagging. Here I am spiting
the input using random split and training tree for each of the split. Here
is sample code:

val bags : Int = 10
val models : Array[DecisionTreeModel]  =
training.randomSplit(Array.fill(bags)(1.0 / bags)).map {
  (data) = DecisionTree.trainClassifier(toLabelPoints(data))
}
def toLablePoint(data: RDD[Double]) : RDD[LabeledPoint] = {
// convert data RDD to lablepoint RDD
}

For your case, I think, you need custom logic to split the dataset.

Thanks
Sourabh


On Tue, Jan 13, 2015 at 3:55 PM, Sean Owen so...@cloudera.com wrote:

 OK, I still wonder whether it's not better to make one big model. The
 usual assumption is that the user's identity isn't predictive per se.
 If every customer in your shop is truly unlike the others, most
 predictive analytics goes out the window. It's factors like our
 location, income, etc that are predictive and there aren't a million
 of those.

 But let's say it's so and you really need 1M RDDs. I think I'd just
 repeatedly filter the source RDD. That really won't be the slow step.
 I think the right way to do it is to create a list of all user IDs on
 the driver, turn it into a parallel collection (and override the # of
 threads it uses on the driver to something reasonable) and map each
 one to the result of filtering and modeling that user subset.

 The problem is just the overhead of scheduling millions and millions
 of tiny modeling jobs. It will still probably take a long time. Could
 be fine if you have still millions of data points per user. It's even
 appropriate. But then the challenge here is that you're processing
 trillions of data points! that will be fun.

 I think any distributed system is overkill and not designed for the
 case where data fits into memory. You can always take a local
 collection and call parallelize to make it into an RDD, so in that
 sense Spark can handle a tiny data set if you really want.

 I'm still not sure I've seen a case where you want to partition by
 user but trust you really need that.

 On Tue, Jan 13, 2015 at 1:30 AM, Josh Buffum jbuf...@gmail.com wrote:
  You are right... my code example doesn't work :)
 
  I actually do want a decision tree per user. So, for 1 million users, I
 want
  1 million trees. We're training against time series data, so there are
 still
  quite a few data points per users. My previous message where I mentioned
  RDDs with no length was, I think, a result of the way the random
  partitioning worked (I was partitioning into N groups where N was the
 number
  of users... total).
 
  Given this, I'm thinking the mlllib is not designed for this particular
  case? It appears optimized for training across large datasets. I was just
  hoping to leverage it since creating my feature sets for the users was
  already in Spark.
 
 
  On Mon, Jan 12, 2015 at 5:05 PM, Sean Owen so...@cloudera.com wrote:
 
  A model partitioned by users?
 
  I mean that if you have a million users surely you don't mean to build a
  million models. There would be little data per user right? Sounds like
 you
  have 0 sometimes.
 
  You would typically be generalizing across users not examining them in
  isolation. Models are built on thousands or millions of data points.
 
  I assumed you were subsetting for cross validation in which case we are
  talking about making more like say 10 models. You usually take random
  subsets. But it might be as fine to subset as a function of a user ID
 if you
  like. Or maybe you do have some reason for segregating users and
 modeling
  them differently (e.g. different geographies or something).
 
  Your code doesn't work as is since you are using RDDs inside RDDs. But I
  am also not sure you should do what it looks like you are trying to do.
 
  On Jan 13, 2015 12:32 AM, Josh Buffum jbuf...@gmail.com wrote:
 
  Sean,
 
  Thanks for the response. Is there some subtle difference between one
  model partitioned by N users or N models per each 1 user? I think I'm
  missing something with your question.
 
  Looping through the RDD filtering one user at a time would certainly
 give
  me the response that I am hoping for (i.e a map of user =
 decisiontree),
  however, that seems like it would yield poor performance? The userIDs
 are
  not integers, so I either need to iterator through some in-memory
 array of
  them (could be quite large) or have some distributed lookup table.
 Neither
  seem great.
 
  I tried the random split thing. I wonder if I did something wrong
 there,
  but some of the splits got RDDs with 0 tuples and some got RDDs with 
 1
  tuple. I guess that's to be expected with some random distribution?
 However,
  that won't work for me since it breaks the one tree per user thing. I
  guess I could randomly distribute user IDs and then do the scan
 everything
  and filter step...
 
  How bad of an idea is it to do:
 
  data.groupByKey.map( kvp = {
val (key, data) = kvp
val tree

Re: MLLIB model export: PMML vs MLLIB serialization

2014-12-15 Thread sourabh
Thanks Vincenzo.
Are you trying out all the models implemented in mllib? Actually I don't
see decision tree there. Sorry if I missed it. When are you planning to
merge this to spark branch?

Thanks
Sourabh

On Sun, Dec 14, 2014 at 5:54 PM, selvinsource [via Apache Spark User List] 
ml-node+s1001560n20674...@n3.nabble.com wrote:

 Hi Sourabh,

 have a look at https://issues.apache.org/jira/browse/SPARK-1406, I am
 looking into exporting models in PMML using JPMML.

 Regards,
 Vincenzo

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20674.html
  To unsubscribe from MLLIB model export: PMML vs MLLIB serialization, click
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=20324code=Y2hha2kuc291cmFiaEBnbWFpbC5jb218MjAzMjR8LTY5MzQzMTU5OQ==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread sourabh chaki
Hi Albert,
There is some discussion going on here:
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tc20324.html#a20674
I am also looking for this solution.But looks like until mllib pmml export
is ready, there is no full proof solution to export the mllib trained model
to a different system.

Thanks
Sourabh

On Mon, Dec 15, 2014 at 10:39 PM, Albert Manyà alber...@eml.cc wrote:

 In that case, what is the strategy to train a model in some background
 batch process and make recommendations for some other service in real
 time? Run both processes in the same spark cluster?

 Thanks.

 --
   Albert Manyà
   alber...@eml.cc

 On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote:
  This class is not going to be serializable, as it contains huge RDDs.
  Even if the right constructor existed the RDDs inside would not
  serialize.
 
  On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà alber...@eml.cc wrote:
   Hi all.
  
   I'm willing to serialize and later load a model trained using mllib's
   ALS.
  
   I've tried usign Java serialization with something like:
  
   val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
   val fos = new FileOutputStream(model.bin)
   val oos = new ObjectOutputStream(fos)
   oos.writeObject(bestModel.get)
  
   But when I try to deserialize it using:
  
   val fos = new FileInputStream(model.bin)
   val oos = new ObjectInputStream(fos)
   val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]
  
I get the error:
  
   Exception in thread main java.io.IOException: PARSING_ERROR(2)
  
   I've also tried to serialize MatrixFactorizationModel's both RDDs
   (products and users) and later create the MatrixFactorizationModel by
   hand passing the RDDs by constructor but I get an error cause its
   private:
  
   Error:(58, 17) constructor MatrixFactorizationModel in class
   MatrixFactorizationModel cannot be accessed in object RecommendALS
   val model = new MatrixFactorizationModel (8, userFeatures,
   productFeatures)
  
   Any ideas?
  
   Thanks!
  
   --
 Albert Manyà
 alber...@eml.cc
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




MLLIB model export: PMML vs MLLIB serialization

2014-12-03 Thread sourabh
Hi All,
I am doing model training using Spark MLLIB inside our hadoop cluster. But
prediction happens in a different realtime synchronous system(Web
application). I am currently exploring different options to export the
trained Mllib models from spark.

   1. *Export model as PMML:* I found the projects under  JPMML: Java PMML
API https://github.com/jpmml   is quite interesting. Use  JPMML
https://github.com/jpmml/jpmml   to convert the mllib model entity to
PMML. And use  PMML evaluator https://github.com/jpmml/jpmml-evaluator  
for prediction in a different system. Or we can also explore  openscoring
rest api https://github.com/jpmml/openscoring   for model deployment and
prediction.

This could be standard approach if we need to port models across different
systems. But converting non linear Mllib models to PMML might be a complex
task. Apart from that I need to keep on updating my Mllib to PMML conversion
code for any new Mllib models or any change in Mllib entities.
I have not evaluated any of these JPMML projects personally and I see there
is only single contributor for these projects. Just wondering if enough
people have already started using these projects. Please share if any of you
have any points on this.

   2. *Export MLLIB model as serialized form:* Mllib models can be
serialized using  Kryo serialization
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAFRXrqdpkfCX41=JyTSmmtt8aNWrSdpJvxE3FmYVZ=uuepe...@mail.gmail.com%3E
  
or normal  java serialization
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-td11953.html
 
. And the same model can be deserialized by different other standalone
applications and use the mllib entity for prediction.  This blog
http://blog.knoldus.com/2014/07/21/play-with-spark-building-spark-mllib-in-a-play-spark-application/
  
shows an example how spark mllib can be used inside Play web application. I
am expecting, I can use spark mllib in any other JVM based web application
in the same way(?). Please share if any one has any experience on this.
  Advantage of this approach is :
 - No recurring effort to support any new model or any change in Mllib
model entity in future version.
 - Less dependency on any other tools
  Disadvantages:
 - Model can not be ported to non JVM system
 - Model serialized using one version of Mllib entity, may not be
deserializable using a different version of mllib entity(?).

I think this is a quite common problem.I am really interested to hear from
you people how you are solving this and what are the approaches and pros and
cons.

Thanks
Sourabh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org