Cluster sizing for recommendations

2015-07-06 Thread Danny Yates
Hi,

I'm having trouble building a recommender and would appreciate a few
pointers.

I have 350,000,000 events which are stored in roughly 500,000 S3 files and
are formatted as semi-structured JSON. These events are not all relevant to
making recommendations.

My code is (roughly):

case class Event(id: String, eventType: String, line: JsonNode)

val raw = sc.textFile("s3n://bucket/path/dt=*/*")  // Files stored by
Hive-style daily partitions

val parsed = raw.map(json => {
val obj = (new ObjectMapper()).readTree(json);

Event(obj.get("_id").asText, obj.get("event").asText, obj);   // Parse
events into Event objects, keeping parse JSON around for later step
})

val downloads = parsed.filter(_.eventType == "download")

val ratings = downloads.map(event => {
// ... extract userid and assetid (product) from JSON - code elided for
brevity ...
Rating(userId, assetId, 1)
}).repartition(2048)

ratings.cache

val model = ALS.trainImplicit(ratings, 10, 10, 0.1, 0.8)

This gets me to a model in around 20-25 minutes, which is actually pretty
impressive. But, to get this far in a reasonable time I need to use a fair
amount of compute power. I've found I need something like 16 x c3.4xl AWS
instances for the workers (16 cores, 30 GB, SSD storage) and an r3.2xl (8
cores, 60 GB, SSD storage) for the master. Oddly, the cached Rating objects
only take a bit under 2GB of RAM.

I'm developing in a shell at the moment, started like this:

spark-shell --master yarn-client --executor-cores 16 --executor-memory 23G
--driver-memory 48G

--executor-cores: 16 because workers have 16 cores
--executor-memory: 23GB because that's about the most I can safely allocate
on a 30GB machine
--driver-memory: 48GB to make use of the memory on the driver

I found that if I didn't put the driver/master on a big box with lots of
RAM I had issues calculating the model, even though the ratings are only
taking about 2GB of RAM.

I'm also setting spark.driver.maxResultSize to 40GB.

If I don't repartition, I end up with 500,000 or so partitions (= number of
S3 files) and the model doesn't build in any reasonable timescale.

Now I've got a model, I'm trying (using 1.4.0-rc1 - I can't upgrade to
1.4.0 yet):

val recommendations = model.recommendProductsForUsers(5)
recommendations.cache
recommendations.first

This invariably crashes with various memory errors - typically GC errors,
or errors saying that I'm exceeding the "spark.akka.frameSize". Increasing
this seems to only prolong my agony.

I would appreciate any advice you can offer. Whilst I appreciate this
requires a fair amount of CPU, it also seems to need an infeasible amount
of RAM. To be honest, I probably have far too much because of limitations
around how I can size EC2 instances in order to get the CPU I need.

But I've been at this for 3 days now and still haven't actually managed to
build any recommendations...

Thanks in advance,

Danny


Can Spark benefit from Hive-like partitions?

2015-01-26 Thread Danny Yates
Hi,

I've got a bunch of data stored in S3 under directories like this:

s3n://blah/y=2015/m=01/d=25/lots-of-files.csv

In Hive, if I issue a query WHERE y=2015 AND m=01, I get the benefit that
it only scans the necessary directories for files to read.

As far as I can tell from searching and reading the docs, the right way of
loading this data into Spark is to use sc.textFile("s3n://blah/*/*/*/")

1) Is there any way in Spark to access y, m and d as fields? In Hive, you
declare them in the schema, but you don't put them in the CSV files - their
values are extracted from the path.
2) Is there any way to get Spark to use the y, m and d fields to minimise
the files it transfers from S3?

Thanks,

Danny.


Re: Can Spark benefit from Hive-like partitions?

2015-01-26 Thread Danny Yates
Thanks Michael.

I'm not actually using Hive at the moment - in fact, I'm trying to avoid it
if I can. I'm just wondering whether Spark has anything similar I can
leverage?

Thanks


Re: Can Spark benefit from Hive-like partitions?

2015-01-26 Thread Danny Yates
Ah, well that is interesting. I'll experiment further tomorrow. Thank you for 
the info!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ETL process design

2015-01-28 Thread Danny Yates
Hi,

My apologies for what has ended up as quite a long email with a lot of
open-ended questions, but, as you can see, I'm really struggling to get
started and would appreciate some guidance from people with more
experience. I'm new to Spark and "big data" in general, and I'm struggling
with what I suspect is actually a fairly simple problem.

For background, this process will run on an EMR cluster in AWS. My files
are all in S3, but the S3 access is pretty straightforward in that
environment, so I'm not overly concerned about that at the moment.

I have a process (or rather, a number of processes) which drop "JSON"
events into files in directories in S3 structured by the date the events
arrived. I say "JSON" because they're one JSON message per line, rather
than one per file. That is, they are amenable to being loaded with
sc.jsonFile(). The directory structure is
s3://bucket/path/-mm-dd/many-files-here, where -mm-dd is the
received date of the events.

Depending on the environment, there could be 4,000 - 5,000 files in each
directory, each having up to 3,000 lines (events) in. So plenty of scope
for parallelism. In general, there will be something like 2,000,000 events
per day initially.

The incoming events are of different types (page views, item purchases,
etc.) but are currently all bundled into the same set of input files. So
the JSON is not uniform across different lines within each file. I'm
amenable to changing this if that's helpful and having the events broken
out into different files by event type.

Oh, and there could be duplicates too, which will need removing. :-)

My challenge is to take these files and transfer them into a more long-term
storage format suitable for both overnight analytics and also ad-hoc
querying. I'm happy for this process to just happen once a day - say, at
1am and process the whole of the previous day's received data.

I'm thing that having Parquet files stored in Hive-like partitions would be
a sensible way forward:
s3://bucket/different-path/t=type/y=/m=mm/d=dd/whatever.parquet. Here,
, mm and dd represent the time the event happened, rather than the time
it arrived. Does that sound sensible? Do you have any other recommendations?

So I need to read each line, parse the JSON, deduplicate the data, decided
which event type it is, and output it to the right file in the right
directory.

I'm struggling with... well... most of it, if I'm honest. Here's what I
have so far.

val data = sc.textFile("s3:///-mm-dd/*")  // load all files for
given received date

// Deduplicate
val dedupe = data.map(line => {
val json = new
com.fasterxml.jackson.databind.ObjectMapper().readTree(line);
val _id = json.get("_id").asText();   // _id is a key that can be used
to dedupe
val event = json.get("event").asText();// event is the event type
val ts = json.get("timestamp").asText();// timestamp is the when
the event happened

(_id, (event, ts, line))   // I figure having event, ts and line at
this point will save time later
}).reduceByKey((a, b) => a)   // For any given pair of lines with the same
_id, pick one arbitrarily

At this point, I guess I'm going to have to split this apart by event type
(I'm happy to have a priori knowledge of the event types) and "formally"
parse each line using a schema to get a SchemaRDD so I can write out
Parquet files. I have exactly zero idea how to approach this part.

The other wrinkle here is that Spark seems to want to "own" the directory
it writes to. But it's possible that on any given run we might pick up a
few left-over events for a previous day, so we need to be able to handle
the situation where we're adding events for a day we've already processed.

Many thanks,

Danny.