Re: HDFS as Shuffle Service

2016-04-26 Thread Takeshi Yamamuro
Hi, all

See SPARK-1529 for related discussion.

// maropu

On Wed, Apr 27, 2016 at 12:27 PM, Saisai Shao 
wrote:

> Quite curious about the benefits of using HDFS as shuffle service, also
> what's the problem of using current shuffle service?
>
>
> Thanks
> Saisai
>
> On Wed, Apr 27, 2016 at 4:31 AM, Timothy Chen  wrote:
>
>> Are you suggesting to have shuffle service persist and fetch data with
>> hdfs, or skip shuffle service altogether and just write to hdfs?
>>
>> Tim
>>
>>
>> > On Apr 26, 2016, at 11:20 AM, Michael Gummelt 
>> wrote:
>> >
>> > Has there been any thought or work on this (or any other networked file
>> system)?  It would be valuable to support dynamic allocation without
>> depending on the shuffle service.
>> >
>> > --
>> > Michael Gummelt
>> > Software Engineer
>> > Mesosphere
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: HDFS as Shuffle Service

2016-04-26 Thread Saisai Shao
Quite curious about the benefits of using HDFS as shuffle service, also
what's the problem of using current shuffle service?


Thanks
Saisai

On Wed, Apr 27, 2016 at 4:31 AM, Timothy Chen  wrote:

> Are you suggesting to have shuffle service persist and fetch data with
> hdfs, or skip shuffle service altogether and just write to hdfs?
>
> Tim
>
>
> > On Apr 26, 2016, at 11:20 AM, Michael Gummelt 
> wrote:
> >
> > Has there been any thought or work on this (or any other networked file
> system)?  It would be valuable to support dynamic allocation without
> depending on the shuffle service.
> >
> > --
> > Michael Gummelt
> > Software Engineer
> > Mesosphere
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


RE: Number of partitions for binaryFiles

2016-04-26 Thread Ulanov, Alexander
The issue is that the data was specifically prepared in such a way that each 
file is a single partition computationally and logically. It seems strange that 
one cannot override the default behavior. It might be too expensive to perform 
another round of re-partitioning within Spark because it will involve shuffling.


From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, April 26, 2016 2:44 PM
To: Ulanov, Alexander 
Cc: dev@spark.apache.org
Subject: Re: Number of partitions for binaryFiles

From what I understand, Spark code was written this way because you don't end 
up with very small partitions.

In your case, look at the size of the cluster.
If 66 partitions can make good use of your cluster, it should be fine.

On Tue, Apr 26, 2016 at 2:27 PM, Ulanov, Alexander 
> wrote:
Hi Ted,

I have 36 files of size ~600KB and the rest 74 are about 400KB.

Is there a workaround rather than changing Sparks code?

Best regards, Alexander

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, April 26, 2016 1:22 PM
To: Ulanov, Alexander 
>
Cc: dev@spark.apache.org
Subject: Re: Number of partitions for binaryFiles

Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = 
listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60 
files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with fewer 
partitions.

I just performed a test using local directory where 3 files were significantly 
larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander 
> wrote:
Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark 
RDD. I need the data from each file to be in a separate partition. However, I 
cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The “minPartitions” parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with 
sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate 
partition?

Best regards, Alexander




Re: Number of partitions for binaryFiles

2016-04-26 Thread Ted Yu
>From what I understand, Spark code was written this way because you don't
end up with very small partitions.

In your case, look at the size of the cluster.
If 66 partitions can make good use of your cluster, it should be fine.

On Tue, Apr 26, 2016 at 2:27 PM, Ulanov, Alexander  wrote:

> Hi Ted,
>
>
>
> I have 36 files of size ~600KB and the rest 74 are about 400KB.
>
>
>
> Is there a workaround rather than changing Sparks code?
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Tuesday, April 26, 2016 1:22 PM
> *To:* Ulanov, Alexander 
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Number of partitions for binaryFiles
>
>
>
> Here is the body of StreamFileInputFormat#setMinPartitions :
>
>
>
>   def setMinPartitions(context: JobContext, minPartitions: Int) {
>
> val totalLen =
> listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
>
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions,
> 1.0)).toLong
>
> super.setMaxSplitSize(maxSplitSize)
>
>
>
> I guess what happened was that among the 100 files you had, there were ~60
> files whose sizes were much bigger than the rest.
>
> According to the way max split size is computed above, you ended up with
> fewer partitions.
>
>
>
> I just performed a test using local directory where 3 files were
> significantly larger than the rest and reproduced what you observed.
>
>
>
> Cheers
>
>
>
> On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
> Dear Spark developers,
>
>
>
> I have 100 binary files in local file system that I want to load into
> Spark RDD. I need the data from each file to be in a separate partition.
> However, I cannot make it happen:
>
>
>
> scala> sc.binaryFiles("/data/subset").partitions.size
>
> res5: Int = 66
>
>
>
> The “minPartitions” parameter does not seems to help:
>
> scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
>
> res8: Int = 66
>
>
>
> At the same time, Spark produces the required number of partitions with
> sc.textFiles (though I cannot use it because my files are binary):
>
> scala> sc.textFile("/data/subset").partitions.size
>
> res9: Int = 100
>
>
>
> Could you suggest how to force Spark to load binary files each in a
> separate partition?
>
>
>
> Best regards, Alexander
>
>
>


RE: Number of partitions for binaryFiles

2016-04-26 Thread Ulanov, Alexander
Hi Ted,

I have 36 files of size ~600KB and the rest 74 are about 400KB.

Is there a workaround rather than changing Sparks code?

Best regards, Alexander

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, April 26, 2016 1:22 PM
To: Ulanov, Alexander 
Cc: dev@spark.apache.org
Subject: Re: Number of partitions for binaryFiles

Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = 
listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60 
files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with fewer 
partitions.

I just performed a test using local directory where 3 files were significantly 
larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander 
> wrote:
Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark 
RDD. I need the data from each file to be in a separate partition. However, I 
cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The “minPartitions” parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with 
sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate 
partition?

Best regards, Alexander



Re: Number of partitions for binaryFiles

2016-04-26 Thread Ted Yu
Here is the body of StreamFileInputFormat#setMinPartitions :

  def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen =
listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions,
1.0)).toLong
super.setMaxSplitSize(maxSplitSize)

I guess what happened was that among the 100 files you had, there were ~60
files whose sizes were much bigger than the rest.
According to the way max split size is computed above, you ended up with
fewer partitions.

I just performed a test using local directory where 3 files were
significantly larger than the rest and reproduced what you observed.

Cheers

On Tue, Apr 26, 2016 at 11:10 AM, Ulanov, Alexander <
alexander.ula...@hpe.com> wrote:

> Dear Spark developers,
>
>
>
> I have 100 binary files in local file system that I want to load into
> Spark RDD. I need the data from each file to be in a separate partition.
> However, I cannot make it happen:
>
>
>
> scala> sc.binaryFiles("/data/subset").partitions.size
>
> res5: Int = 66
>
>
>
> The “minPartitions” parameter does not seems to help:
>
> scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
>
> res8: Int = 66
>
>
>
> At the same time, Spark produces the required number of partitions with
> sc.textFiles (though I cannot use it because my files are binary):
>
> scala> sc.textFile("/data/subset").partitions.size
>
> res9: Int = 100
>
>
>
> Could you suggest how to force Spark to load binary files each in a
> separate partition?
>
>
>
> Best regards, Alexander
>


HDFS as Shuffle Service

2016-04-26 Thread Michael Gummelt
Has there been any thought or work on this (or any other networked file
system)?  It would be valuable to support dynamic allocation without
depending on the shuffle service.

-- 
Michael Gummelt
Software Engineer
Mesosphere


Number of partitions for binaryFiles

2016-04-26 Thread Ulanov, Alexander
Dear Spark developers,

I have 100 binary files in local file system that I want to load into Spark 
RDD. I need the data from each file to be in a separate partition. However, I 
cannot make it happen:

scala> sc.binaryFiles("/data/subset").partitions.size
res5: Int = 66

The "minPartitions" parameter does not seems to help:
scala> sc.binaryFiles("/data/subset", minPartitions = 100).partitions.size
res8: Int = 66

At the same time, Spark produces the required number of partitions with 
sc.textFiles (though I cannot use it because my files are binary):
scala> sc.textFile("/data/subset").partitions.size
res9: Int = 100

Could you suggest how to force Spark to load binary files each in a separate 
partition?

Best regards, Alexander


Re: Spark streaming Kafka receiver WriteAheadLog question

2016-04-26 Thread Mario Ds Briggs
That was my initial thought as well. But then i was wondering if this
approach could help remove
 a - the little extra latency overhead we have with the DirectApproach
(compared to Receiver) and
 b - the data duplication in-efficiency (replication to WAL) and single
version of the truth of the offsets processed (under some failures) in the
Receiver approach.

thanks
Mario

- Message from Cody Koeninger  on Mon, 25 Apr 2016
09:23:32 -0500 -
 
  To: Renyi Xiong 
 
  cc: dev  
 
 Subject: Re: Spark streaming Kafka receiver 
  WriteAheadLog question 
 

If you want to refer back to Kafka based on offset ranges, why not use
createDirectStream?

On Fri, Apr 22, 2016 at 11:49 PM, Renyi Xiong 
wrote:
> Hi,
>
> Is it possible for Kafka receiver generated WriteAheadLogBackedBlockRDD
to
> hold corresponded Kafka offset range so that during recovery the RDD can
> refer back to Kafka queue instead of paying the cost of write ahead log?
>
> I guess there must be a reason here. Could anyone please help me
understand?
>
> Thanks,
> Renyi.