Hi Danny,

What you describe sounds like you may also consider to use Spring XD instead, 
at least for the file-centric stuff.

Regards
Ben

Von meinem iPad gesendet

> Am 28.01.2015 um 10:42 schrieb Danny Yates <da...@codeaholics.org>:
> 
> Hi,
> 
> My apologies for what has ended up as quite a long email with a lot of 
> open-ended questions, but, as you can see, I'm really struggling to get 
> started and would appreciate some guidance from people with more experience. 
> I'm new to Spark and "big data" in general, and I'm struggling with what I 
> suspect is actually a fairly simple problem.
> 
> For background, this process will run on an EMR cluster in AWS. My files are 
> all in S3, but the S3 access is pretty straightforward in that environment, 
> so I'm not overly concerned about that at the moment.
> 
> I have a process (or rather, a number of processes) which drop "JSON" events 
> into files in directories in S3 structured by the date the events arrived. I 
> say "JSON" because they're one JSON message per line, rather than one per 
> file. That is, they are amenable to being loaded with sc.jsonFile(). The 
> directory structure is s3://bucket/path/yyyy-mm-dd/many-files-here, where 
> yyyy-mm-dd is the received date of the events.
> 
> Depending on the environment, there could be 4,000 - 5,000 files in each 
> directory, each having up to 3,000 lines (events) in. So plenty of scope for 
> parallelism. In general, there will be something like 2,000,000 events per 
> day initially.
> 
> The incoming events are of different types (page views, item purchases, etc.) 
> but are currently all bundled into the same set of input files. So the JSON 
> is not uniform across different lines within each file. I'm amenable to 
> changing this if that's helpful and having the events broken out into 
> different files by event type.
> 
> Oh, and there could be duplicates too, which will need removing. :-)
> 
> My challenge is to take these files and transfer them into a more long-term 
> storage format suitable for both overnight analytics and also ad-hoc 
> querying. I'm happy for this process to just happen once a day - say, at 1am 
> and process the whole of the previous day's received data.
> 
> I'm thing that having Parquet files stored in Hive-like partitions would be a 
> sensible way forward: 
> s3://bucket/different-path/t=type/y=yyyy/m=mm/d=dd/whatever.parquet. Here, 
> yyyy, mm and dd represent the time the event happened, rather than the time 
> it arrived. Does that sound sensible? Do you have any other recommendations?
> 
> So I need to read each line, parse the JSON, deduplicate the data, decided 
> which event type it is, and output it to the right file in the right 
> directory.
> 
> I'm struggling with... well... most of it, if I'm honest. Here's what I have 
> so far.
> 
> val data = sc.textFile("s3://..../yyyy-mm-dd/*")  // load all files for given 
> received date
> 
> // Deduplicate
> val dedupe = data.map(line => {
>     val json = new 
> com.fasterxml.jackson.databind.ObjectMapper().readTree(line);
>     val _id = json.get("_id").asText();   // _id is a key that can be used to 
> dedupe
>     val event = json.get("event").asText();    // event is the event type
>     val ts = json.get("timestamp").asText();    // timestamp is the when the 
> event happened
> 
>     (_id, (event, ts, line))   // I figure having event, ts and line at this 
> point will save time later
> }).reduceByKey((a, b) => a)   // For any given pair of lines with the same 
> _id, pick one arbitrarily
> 
> At this point, I guess I'm going to have to split this apart by event type 
> (I'm happy to have a priori knowledge of the event types) and "formally" 
> parse each line using a schema to get a SchemaRDD so I can write out Parquet 
> files. I have exactly zero idea how to approach this part.
> 
> The other wrinkle here is that Spark seems to want to "own" the directory it 
> writes to. But it's possible that on any given run we might pick up a few 
> left-over events for a previous day, so we need to be able to handle the 
> situation where we're adding events for a day we've already processed.
> 
> Many thanks,
> 
> Danny.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to