There isnt a db target in contrib? There is a db source, IIRC. On Oct 18, 2013 8:29 PM, "Gabriel Reid" <[email protected]> wrote:
> Hi Hrishikesh, > > About the database insertion -- if you want to have parallel writes to > a database, probably the best way to do this within Crunch would be to > implement a Target[1] for writing to your RDBMS. This would involve > using a Hadoop OutputFormat, and would do the writes in parallel. > > There isn't currently a DatabaseTarget available in Crunch. However, > the Sqoop project[2] deals specifically with moving data between HDFS > and databases, and also helps to take care of details like limiting an > import job to the amount of parallelism that an RDBMS can handle. > Probably the best option (although slightly more involved) would be to > use Crunch to do your data processing and output data to HDFS, and > then use Sqoop to import the data into your RDBMS. > > - Gabriel > > [1] http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/Target.html > [2] http://sqoop.apache.org > > On Sat, Oct 19, 2013 at 12:14 AM, Hrishikesh P > <[email protected]> wrote: > > Hi Gabriel, > > > > Thanks for the reply, that definitely answered some of my questions. > > > > My PCollection is relatively small (30K pairs should be a small value). > My > > main concern was that the process method was getting called twice. But > after > > adding "table.materialize().iterator().next();" its now getting called > only > > once, which will solve my problem for now. > > > > But I see your point. If I make the database insertion operation an end > > point of the pipeline, would I still be able to do it in parallel, > because > > that is one of the reasons I put the logic in the process method. If > not, I > > may have to continue with my approach. > > > > Thanks. > > > > > > On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <[email protected]> > > wrote: > >> > >> Hi Hrishikesh, > >> > >> I think the basic principle in Crunch that should answer a large > >> portion of your questions is this: things only really start happening > >> in Crunch if you either write data to a target (i.e. > >> pipeline.write(...)), or materialize a PCollection. Materializing a > >> PCollection is something you typically only want to do if it is > >> relatively small, or if you have a specific need to get the contents > >> of a PCollection into the client application. > >> > >> If you specifically want to ensure that the TestFn is only used once > >> in your pipeline, it should be sufficient to add the following call > >> just after the "table" PCollection is created: > >> > >> table.materialize().iterator().next(); > >> > >> Using this instead of pipeline.run will probably ensure that TestFn is > >> only run once over the collection in this case. > >> > >> However, it's actually a much better idea to not have operations that > >> affect the state of an external application/process within a DoFn. I > >> would recommend instead writing the data to be added to an external > >> database as an end-point of a pipeline (or even as something that can > >> be run on the output of a pipeline), instead of doing it within a > >> pipeline. In this way you can become immune to the number of times a > >> DoFn is used/re-used in a pipeline. > >> > >> Hope this helps, but if this isn't answering your questions then just > >> let me know. > >> > >> - Gabriel > >> > >> > >> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P < > [email protected]> > >> wrote: > >> > Basically, I have a need to emit a pair of a string and an int from > the > >> > DoFn's process method; the integers will later be grouped by the > string. > >> > In > >> > addition to this, I also need to emit another long from the process > >> > method > >> > that is independent of this pair. At the end, I want to - > >> > 1) group the integers by string value > >> > 2) print the max long value. > >> > > >> > So I came up with the following code: > >> > > >> > {pseudo-code} > >> > ... > >> > > >> > PTable<Double, Long> inputTable = read() // internal read method > >> > > >> > PTable<Pair<String, Integer>, Long> table = > >> > inputTable.parallelDo("testjob", > >> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(), > >> > Writables.ints()), Writables.longs())); > >> > > >> > pipeline.run(); > >> > > >> > final Iterator<Long> it = > >> > table.top(1).values().asCollection().getValue().iterator(); > >> > > >> > if (it.hasNext()) { > >> > > >> > System.err.println("MAX LONG = " + it.next()); > >> > > >> > } > >> > > >> > PTable<String, Integer> newTable = table.keys().parallelDo(new > >> > PTableConverterFn(), Writables.tableOf(Writables.strings(), > >> > Writables.ints())); > >> > > >> > // PTableConverterFn just emits the input back, it is used to convert > >> > > >> > // PCollection to PTable so that the values can be easily grouped by. > >> > > >> > final Iterator<Pair<String, Integer>> mapIt = > >> > > >> > > newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator(); > >> > > >> > int count = 0; > >> > > >> > while (mapIt.hasNext()) { > >> > > >> > Pair<String, Integer> aPair = mapIt.next(); > >> > > >> > System.out.println(aPair.first() + "---" + > >> > aPair.second()); > >> > // Need to print these in the logs. > >> > > >> > count += aPair.second(); > >> > > >> > } > >> > > >> > pipeline.done(); > >> > > >> > System.out.println("Count = " + count); > >> > > >> > ... > >> > > >> > private static class TestFn extends DoFn<Pair<Double, Long>, > >> > Pair<Pair<String, Integer>, Long>> { > >> > > >> > protected TestFn(){ > >> > > >> > } > >> > > >> > @Override > >> > > >> > public void process(Pair<Double, Long> input, > >> > Emitter<Pair<Pair<String, Integer>, Long>> emitter) { > >> > > >> > System.out.println("Entering process ..."); > >> > > >> > ... > >> > > >> > Pair<String, Integer> aPair = processInput(input); // internal method, > >> > does > >> > some database insertion operations. > >> > > >> > long maxLong = 0; > >> > > >> > maxLong = findMaxLong(input.first()); // internal method > >> > > >> > ... > >> > > >> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String, > Integer>, > >> > Long>(aPair, maxLong); > >> > > >> > emitter.emit(pair); > >> > > >> > System.out.println("Exiting process ..."); > >> > > >> > } > >> > > >> > } > >> > > >> > ... > >> > > >> > {pseudo-code} > >> > > >> > > >> > This code gives me the results I expect, but the DoFn process method > >> > gets > >> > called twice, which I guess happens first during the > >> > "table.top(1).values().asCollection().getValue().iterator()" call and > >> > second > >> > during the > >> > > >> > > "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()" > >> > call. Since I am doing database insertion operations inside the > process > >> > method, this is undesirable. > >> > > >> > My questions are: > >> > > >> > 1) Is there a better way to do what I am trying to do here? > >> > > >> > 2) when/how many times the process method actually gets called and/or > >> > why > >> > does it get called more than once? > >> > > >> > 3) If I did not have the need to print the values after the > processing, > >> > would the database insertions still have happened without > materializing > >> > the > >> > results? Is materializing necessary for processing to happen? > >> > > >> > Thanks for the help/input. > > > > >
