Yep, there's only a JDBC Source in contrib, no Target.
On Sat, Oct 19, 2013 at 5:32 AM, Josh Wills <[email protected]> wrote: > There isnt a db target in contrib? There is a db source, IIRC. > > On Oct 18, 2013 8:29 PM, "Gabriel Reid" <[email protected]> wrote: >> >> Hi Hrishikesh, >> >> About the database insertion -- if you want to have parallel writes to >> a database, probably the best way to do this within Crunch would be to >> implement a Target[1] for writing to your RDBMS. This would involve >> using a Hadoop OutputFormat, and would do the writes in parallel. >> >> There isn't currently a DatabaseTarget available in Crunch. However, >> the Sqoop project[2] deals specifically with moving data between HDFS >> and databases, and also helps to take care of details like limiting an >> import job to the amount of parallelism that an RDBMS can handle. >> Probably the best option (although slightly more involved) would be to >> use Crunch to do your data processing and output data to HDFS, and >> then use Sqoop to import the data into your RDBMS. >> >> - Gabriel >> >> [1] http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/Target.html >> [2] http://sqoop.apache.org >> >> On Sat, Oct 19, 2013 at 12:14 AM, Hrishikesh P >> <[email protected]> wrote: >> > Hi Gabriel, >> > >> > Thanks for the reply, that definitely answered some of my questions. >> > >> > My PCollection is relatively small (30K pairs should be a small value). >> > My >> > main concern was that the process method was getting called twice. But >> > after >> > adding "table.materialize().iterator().next();" its now getting called >> > only >> > once, which will solve my problem for now. >> > >> > But I see your point. If I make the database insertion operation an end >> > point of the pipeline, would I still be able to do it in parallel, >> > because >> > that is one of the reasons I put the logic in the process method. If >> > not, I >> > may have to continue with my approach. >> > >> > Thanks. >> > >> > >> > On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <[email protected]> >> > wrote: >> >> >> >> Hi Hrishikesh, >> >> >> >> I think the basic principle in Crunch that should answer a large >> >> portion of your questions is this: things only really start happening >> >> in Crunch if you either write data to a target (i.e. >> >> pipeline.write(...)), or materialize a PCollection. Materializing a >> >> PCollection is something you typically only want to do if it is >> >> relatively small, or if you have a specific need to get the contents >> >> of a PCollection into the client application. >> >> >> >> If you specifically want to ensure that the TestFn is only used once >> >> in your pipeline, it should be sufficient to add the following call >> >> just after the "table" PCollection is created: >> >> >> >> table.materialize().iterator().next(); >> >> >> >> Using this instead of pipeline.run will probably ensure that TestFn is >> >> only run once over the collection in this case. >> >> >> >> However, it's actually a much better idea to not have operations that >> >> affect the state of an external application/process within a DoFn. I >> >> would recommend instead writing the data to be added to an external >> >> database as an end-point of a pipeline (or even as something that can >> >> be run on the output of a pipeline), instead of doing it within a >> >> pipeline. In this way you can become immune to the number of times a >> >> DoFn is used/re-used in a pipeline. >> >> >> >> Hope this helps, but if this isn't answering your questions then just >> >> let me know. >> >> >> >> - Gabriel >> >> >> >> >> >> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P >> >> <[email protected]> >> >> wrote: >> >> > Basically, I have a need to emit a pair of a string and an int from >> >> > the >> >> > DoFn's process method; the integers will later be grouped by the >> >> > string. >> >> > In >> >> > addition to this, I also need to emit another long from the process >> >> > method >> >> > that is independent of this pair. At the end, I want to - >> >> > 1) group the integers by string value >> >> > 2) print the max long value. >> >> > >> >> > So I came up with the following code: >> >> > >> >> > {pseudo-code} >> >> > ... >> >> > >> >> > PTable<Double, Long> inputTable = read() // internal read method >> >> > >> >> > PTable<Pair<String, Integer>, Long> table = >> >> > inputTable.parallelDo("testjob", >> >> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(), >> >> > Writables.ints()), Writables.longs())); >> >> > >> >> > pipeline.run(); >> >> > >> >> > final Iterator<Long> it = >> >> > table.top(1).values().asCollection().getValue().iterator(); >> >> > >> >> > if (it.hasNext()) { >> >> > >> >> > System.err.println("MAX LONG = " + it.next()); >> >> > >> >> > } >> >> > >> >> > PTable<String, Integer> newTable = table.keys().parallelDo(new >> >> > PTableConverterFn(), Writables.tableOf(Writables.strings(), >> >> > Writables.ints())); >> >> > >> >> > // PTableConverterFn just emits the input back, it is used to convert >> >> > >> >> > // PCollection to PTable so that the values can be easily grouped by. >> >> > >> >> > final Iterator<Pair<String, Integer>> mapIt = >> >> > >> >> > >> >> > newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator(); >> >> > >> >> > int count = 0; >> >> > >> >> > while (mapIt.hasNext()) { >> >> > >> >> > Pair<String, Integer> aPair = mapIt.next(); >> >> > >> >> > System.out.println(aPair.first() + "---" + >> >> > aPair.second()); >> >> > // Need to print these in the logs. >> >> > >> >> > count += aPair.second(); >> >> > >> >> > } >> >> > >> >> > pipeline.done(); >> >> > >> >> > System.out.println("Count = " + count); >> >> > >> >> > ... >> >> > >> >> > private static class TestFn extends DoFn<Pair<Double, Long>, >> >> > Pair<Pair<String, Integer>, Long>> { >> >> > >> >> > protected TestFn(){ >> >> > >> >> > } >> >> > >> >> > @Override >> >> > >> >> > public void process(Pair<Double, Long> input, >> >> > Emitter<Pair<Pair<String, Integer>, Long>> emitter) { >> >> > >> >> > System.out.println("Entering process ..."); >> >> > >> >> > ... >> >> > >> >> > Pair<String, Integer> aPair = processInput(input); // internal >> >> > method, >> >> > does >> >> > some database insertion operations. >> >> > >> >> > long maxLong = 0; >> >> > >> >> > maxLong = findMaxLong(input.first()); // internal method >> >> > >> >> > ... >> >> > >> >> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String, >> >> > Integer>, >> >> > Long>(aPair, maxLong); >> >> > >> >> > emitter.emit(pair); >> >> > >> >> > System.out.println("Exiting process ..."); >> >> > >> >> > } >> >> > >> >> > } >> >> > >> >> > ... >> >> > >> >> > {pseudo-code} >> >> > >> >> > >> >> > This code gives me the results I expect, but the DoFn process method >> >> > gets >> >> > called twice, which I guess happens first during the >> >> > "table.top(1).values().asCollection().getValue().iterator()" call and >> >> > second >> >> > during the >> >> > >> >> > >> >> > "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()" >> >> > call. Since I am doing database insertion operations inside the >> >> > process >> >> > method, this is undesirable. >> >> > >> >> > My questions are: >> >> > >> >> > 1) Is there a better way to do what I am trying to do here? >> >> > >> >> > 2) when/how many times the process method actually gets called and/or >> >> > why >> >> > does it get called more than once? >> >> > >> >> > 3) If I did not have the need to print the values after the >> >> > processing, >> >> > would the database insertions still have happened without >> >> > materializing >> >> > the >> >> > results? Is materializing necessary for processing to happen? >> >> > >> >> > Thanks for the help/input. >> > >> >
