Hi Gabriel, Thanks for the reply, that definitely answered some of my questions.
My PCollection is relatively small (30K pairs should be a small value). My main concern was that the process method was getting called twice. But after adding "table.materialize().iterator().next();" its now getting called only once, which will solve my problem for now. But I see your point. If I make the database insertion operation an end point of the pipeline, would I still be able to do it in parallel, because that is one of the reasons I put the logic in the process method. If not, I may have to continue with my approach. Thanks. On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <[email protected]>wrote: > Hi Hrishikesh, > > I think the basic principle in Crunch that should answer a large > portion of your questions is this: things only really start happening > in Crunch if you either write data to a target (i.e. > pipeline.write(...)), or materialize a PCollection. Materializing a > PCollection is something you typically only want to do if it is > relatively small, or if you have a specific need to get the contents > of a PCollection into the client application. > > If you specifically want to ensure that the TestFn is only used once > in your pipeline, it should be sufficient to add the following call > just after the "table" PCollection is created: > > table.materialize().iterator().next(); > > Using this instead of pipeline.run will probably ensure that TestFn is > only run once over the collection in this case. > > However, it's actually a much better idea to not have operations that > affect the state of an external application/process within a DoFn. I > would recommend instead writing the data to be added to an external > database as an end-point of a pipeline (or even as something that can > be run on the output of a pipeline), instead of doing it within a > pipeline. In this way you can become immune to the number of times a > DoFn is used/re-used in a pipeline. > > Hope this helps, but if this isn't answering your questions then just > let me know. > > - Gabriel > > > On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P <[email protected]> > wrote: > > Basically, I have a need to emit a pair of a string and an int from the > > DoFn's process method; the integers will later be grouped by the string. > In > > addition to this, I also need to emit another long from the process > method > > that is independent of this pair. At the end, I want to - > > 1) group the integers by string value > > 2) print the max long value. > > > > So I came up with the following code: > > > > {pseudo-code} > > ... > > > > PTable<Double, Long> inputTable = read() // internal read method > > > > PTable<Pair<String, Integer>, Long> table = > inputTable.parallelDo("testjob", > > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(), > > Writables.ints()), Writables.longs())); > > > > pipeline.run(); > > > > final Iterator<Long> it = > > table.top(1).values().asCollection().getValue().iterator(); > > > > if (it.hasNext()) { > > > > System.err.println("MAX LONG = " + it.next()); > > > > } > > > > PTable<String, Integer> newTable = table.keys().parallelDo(new > > PTableConverterFn(), Writables.tableOf(Writables.strings(), > > Writables.ints())); > > > > // PTableConverterFn just emits the input back, it is used to convert > > > > // PCollection to PTable so that the values can be easily grouped by. > > > > final Iterator<Pair<String, Integer>> mapIt = > > > newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator(); > > > > int count = 0; > > > > while (mapIt.hasNext()) { > > > > Pair<String, Integer> aPair = mapIt.next(); > > > > System.out.println(aPair.first() + "---" + > aPair.second()); > > // Need to print these in the logs. > > > > count += aPair.second(); > > > > } > > > > pipeline.done(); > > > > System.out.println("Count = " + count); > > > > ... > > > > private static class TestFn extends DoFn<Pair<Double, Long>, > > Pair<Pair<String, Integer>, Long>> { > > > > protected TestFn(){ > > > > } > > > > @Override > > > > public void process(Pair<Double, Long> input, > > Emitter<Pair<Pair<String, Integer>, Long>> emitter) { > > > > System.out.println("Entering process ..."); > > > > ... > > > > Pair<String, Integer> aPair = processInput(input); // internal method, > does > > some database insertion operations. > > > > long maxLong = 0; > > > > maxLong = findMaxLong(input.first()); // internal method > > > > ... > > > > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String, Integer>, > > Long>(aPair, maxLong); > > > > emitter.emit(pair); > > > > System.out.println("Exiting process ..."); > > > > } > > > > } > > > > ... > > > > {pseudo-code} > > > > > > This code gives me the results I expect, but the DoFn process method gets > > called twice, which I guess happens first during the > > "table.top(1).values().asCollection().getValue().iterator()" call and > second > > during the > > > "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()" > > call. Since I am doing database insertion operations inside the process > > method, this is undesirable. > > > > My questions are: > > > > 1) Is there a better way to do what I am trying to do here? > > > > 2) when/how many times the process method actually gets called and/or why > > does it get called more than once? > > > > 3) If I did not have the need to print the values after the processing, > > would the database insertions still have happened without materializing > the > > results? Is materializing necessary for processing to happen? > > > > Thanks for the help/input. >
