I just noticed my snippets contains a whole lot of errors, but I'm glad it's been helpful. :)
On Wed, May 4, 2016 at 3:59 PM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > Thanks Stefano! I guess you're right, it's probably not too bad except the > MapFunction, which I have swapped with your suggestion now. I was just a > bit confused by the fact that I had to state so many types, where I thought > they could be inferred automatically. I tried variations of the > "non-explicit" MapFunction, but I must have messed up something. The Array > matching is pretty handy as well. I'm good to go now, all works well and > looks a bit more Scala-y now :) > > Robert > > On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino < > stefano.bagh...@radicalbit.io> wrote: > >> The only real noise I see is the usage of a MapFunction, which can be >> rewritten like this in Scala: >> >> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) >> val rankingsInput: DataSet[Ranking] = >> env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], >> rankingsInputPath, job).map[Ranking] { >> (value: (LongWritable, Text)) = { >> val Array(name, n, m) = value._2.toString.split(",") >> Ranking(name, n.toInt, m.toInt) // no new needed for case classes >> } >> }) >> >> As you may have noticed, I've also destructured the tuple in the first >> line. Another way to do this destructuring in a more concise way is to use >> an API extension [1] (which won't be available before 1.1, I suppose). >> >> Since you're parsing textual date, it could also possibly make sense to >> handle error conditions for malformed inputs; here is an example that uses >> flatMap to do so: >> >> import scala.util.{Try, Success, Failure} // needed to work with the >> "functional" Try >> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) >> val rankingsInput: DataSet[Ranking] = >> env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], >> rankingsInputPath, job).flatMap[Ranking] { >> (value: (LongWritable, Text), out: Collector[Ranking]) = { >> Try { >> val Array(name, n, m) = value._2.toString.split(",") // exception >> thrown if array size != 3 >> Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are >> not numbers >> } match { >> case Success(ranking) => ranking >> case Failure(exception) => // deal with malformed input, perhaps >> log >> } >> } >> }) >> >> Feel free to ask me for any kind of clarifications on the snippets [2] I >> posted, I'll gladly help you further if you need it. >> >> Last note: I'm not a user but I believe Shapeless has some very handy >> constructs to move back and forth between tuples and case classes (but >> please take this with a grain of salt). >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions >> [2]: I didn't test them, so caution is advisable ;) >> >> On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro.schmid...@gmail.com> >> wrote: >> >>> Hi everyone, >>> >>> first up, I'm new to Scala, so please bear with me, but I could not find >>> any solution on the web or the Flink documentation. I'm having trouble >>> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case >>> class. I got it to work, however in a way that I feel is too verbose for >>> Scala: >>> >>> >>> import org.apache.flink.api.common.functions.MapFunction >>> import org.apache.flink.api.scala._ >>> >>> import org.apache.hadoop.io.LongWritable >>> import org.apache.hadoop.io.Text >>> >>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) >>> val rankingsInput: DataSet[Ranking] = >>> env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], >>> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), >>> Ranking] { >>> override def map(value: (LongWritable, Text)) = { >>> val splits = value._2.toString.split(",") >>> new Ranking(splits(0), splits(1).toInt, splits(2).toInt) >>> } >>> }) >>> >>> >>> Is there a simpler way of doing this? All other variants I've tried >>> yield some type information errors. >>> >>> Thanks in advance! >>> Robert >>> >>> -- >>> My GPG Key ID: 336E2680 >>> >> >> >> >> -- >> BR, >> Stefano Baghino >> >> Software Engineer @ Radicalbit >> > > > > -- > My GPG Key ID: 336E2680 > -- BR, Stefano Baghino Software Engineer @ Radicalbit