Hi Danny, What you describe sounds like you may also consider to use Spring XD instead, at least for the file-centric stuff.
Regards Ben Von meinem iPad gesendet > Am 28.01.2015 um 10:42 schrieb Danny Yates <da...@codeaholics.org>: > > Hi, > > My apologies for what has ended up as quite a long email with a lot of > open-ended questions, but, as you can see, I'm really struggling to get > started and would appreciate some guidance from people with more experience. > I'm new to Spark and "big data" in general, and I'm struggling with what I > suspect is actually a fairly simple problem. > > For background, this process will run on an EMR cluster in AWS. My files are > all in S3, but the S3 access is pretty straightforward in that environment, > so I'm not overly concerned about that at the moment. > > I have a process (or rather, a number of processes) which drop "JSON" events > into files in directories in S3 structured by the date the events arrived. I > say "JSON" because they're one JSON message per line, rather than one per > file. That is, they are amenable to being loaded with sc.jsonFile(). The > directory structure is s3://bucket/path/yyyy-mm-dd/many-files-here, where > yyyy-mm-dd is the received date of the events. > > Depending on the environment, there could be 4,000 - 5,000 files in each > directory, each having up to 3,000 lines (events) in. So plenty of scope for > parallelism. In general, there will be something like 2,000,000 events per > day initially. > > The incoming events are of different types (page views, item purchases, etc.) > but are currently all bundled into the same set of input files. So the JSON > is not uniform across different lines within each file. I'm amenable to > changing this if that's helpful and having the events broken out into > different files by event type. > > Oh, and there could be duplicates too, which will need removing. :-) > > My challenge is to take these files and transfer them into a more long-term > storage format suitable for both overnight analytics and also ad-hoc > querying. I'm happy for this process to just happen once a day - say, at 1am > and process the whole of the previous day's received data. > > I'm thing that having Parquet files stored in Hive-like partitions would be a > sensible way forward: > s3://bucket/different-path/t=type/y=yyyy/m=mm/d=dd/whatever.parquet. Here, > yyyy, mm and dd represent the time the event happened, rather than the time > it arrived. Does that sound sensible? Do you have any other recommendations? > > So I need to read each line, parse the JSON, deduplicate the data, decided > which event type it is, and output it to the right file in the right > directory. > > I'm struggling with... well... most of it, if I'm honest. Here's what I have > so far. > > val data = sc.textFile("s3://..../yyyy-mm-dd/*") // load all files for given > received date > > // Deduplicate > val dedupe = data.map(line => { > val json = new > com.fasterxml.jackson.databind.ObjectMapper().readTree(line); > val _id = json.get("_id").asText(); // _id is a key that can be used to > dedupe > val event = json.get("event").asText(); // event is the event type > val ts = json.get("timestamp").asText(); // timestamp is the when the > event happened > > (_id, (event, ts, line)) // I figure having event, ts and line at this > point will save time later > }).reduceByKey((a, b) => a) // For any given pair of lines with the same > _id, pick one arbitrarily > > At this point, I guess I'm going to have to split this apart by event type > (I'm happy to have a priori knowledge of the event types) and "formally" > parse each line using a schema to get a SchemaRDD so I can write out Parquet > files. I have exactly zero idea how to approach this part. > > The other wrinkle here is that Spark seems to want to "own" the directory it > writes to. But it's possible that on any given run we might pick up a few > left-over events for a previous day, so we need to be able to handle the > situation where we're adding events for a day we've already processed. > > Many thanks, > > Danny. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org