Could you instead key by day and then use per key computations and/or groups by key? Often it is easier to compute per key than to partition. The first is a simpler pipeline structure while partition requires more typically duplicated transform nodes.
On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus < [email protected]> wrote: > And I think my approach will never work. I just RTFM again and it clearly > states: > > “You can, for example, pass the number of partitions as a command-line > option at runtime > > (which will then be used to build your pipeline graph), but you cannot > determine the number > > of partitions in mid-pipeline (based on data calculated after your > pipeline graph is constructed, for instance).” [0] > > > > Meaning that I cannot read my data in my pipeline to determine the number > of days I want to partition it by to > > write it to dated tables in BQ, or at least to write AVRO files (one per > day) to GCS. > > > > I guess I need two pipelines. This was something I wanted to avoid as it > is making the thing more complicated, since > > I need to chain the two and check if all from the first part are > successfully done, before I can start phase two. > > > > Or am I missing something here? > > Tobi > > > > [0] > https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition > > > > On 19.02.17, 21:41, "Tobias Feldhaus" <[email protected]> > wrote: > > > > Sorry for the formatting of the last mail, I basically killed the quoting. > > I think I figured it out partially. > > I am referring to this example [0] and tried to mimic the behaviour. > > The code looks like this: > > > > Pipeline p = Pipeline.create(options); > > > > PCollection<ItLogLine> logLines = p.apply("Read logfile", > TextIO.Read.from(bucket)) > > .apply("Repartition", Repartition.of()) > > .apply("Parse JSON", ParDo.of(new ReadObjects())) > > .apply("Extract timestamp", ParDo.of(new > ExtractTimestamps())); > > > > PCollection<TableRow> output = logLines.apply("Format Output", > ParDo.of(new Outputter())); > > > > output.apply("Write to BigQueryService", BigQueryIO.Write > > .to("somedataset."+tableName) > > .withSchema(schema) > > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); > > > > PCollection<KV<Instant, Instant>> dates = logLines > > .apply("Extract Instant", ParDo.of(new GetDateFunction())) > > .apply(WithKeys.of(new SerializableFunction<Instant, > Instant>() { > > public Instant apply(Instant instant) { return > instant; } })); > > > > final PCollectionView<Long> numOfDays = > > dates > > .apply("GetDateTimes", Keys.<Instant>create()) > > .apply("DistinctDocs", Distinct.<Instant>create()) > > .apply("Count", Count.<Instant>globally()) > > .apply("View", View.<Long>asSingleton()); > > > > // PCollectionList pCollectionPerDay = logLines.apply("Partition per > Day", Partition.of ( I need to know the number of days here) > > > > I was using Integer before in the PCollectionView and that was the issue, > but it works with Long. Also, > > I was using DateTime, which does not come with a DefaultCoder predefined, > so I simply switched to > Instants (using the TlDf example [1], and WithKeys.of [2]). > > > > I have two big open questions. In my code I was using > c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I > parsed > > a string from a ItLogLine field into an instant and attached it to the > element to use it later in the chain again. > > > > Now, after switching to Beam from Dataflow, this stopped working. I assume > that I was using it wrongly, since it says that > > > > “*If invoked from ProcessElement, the timestamp must not be older than > the input element's timestamp minus DoFn#getAllowedTimestampSkew”* > > > > so I want to change my code to do the right thing - however, in the > current documentation I cannot find the text that describes how to > > assign timestamps, as the link is broken (“see Assigning Timestamps > <https://beam.apache.org/documentation/programming-guide/#windowing> for > more information on how to do so.” – the link goes to #Windowing). > > My second question is about the PCollectionView – Dan said that it’s > possible to use it as a SideInput, but is it also possible to use a > sideinput when doing partitioning? > Intuitively I would like to get a single Long (numOfDays) and then use > that to the Partition.of(). > > > > > > Have a nice Sunday, > Tobi > > > > [0] > https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209 > > [1] > https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181 > > [2] > https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys > > > > On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]> > wrote: > > > > Hi Ken, > > > > thank you for your answer! > > > > > > > > On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus < > [email protected]> wrote: > > > > My intention is to get a > > > > PCollectionView<Long> numOfDays > > > > Which holds the number of elements (days) of a given > PCollection<DateTime> dates, > > and building it via a sum of counts (as a SingletonView). > > > > Something that corresponds in my head to *wrong*: > > > > dates.apply("Count", Count.globally().asSingletonView()); > > > > Seems about right. > > > > It seems about right to me too, but apparently I have a misconception > about what is happening here in my head: > > > > PCollection<ItLogLine> logLines = p.apply("Read logfile", > TextIO.Read.from(bucket)) > > .apply("Repartition", Repartition.of()) > > .apply("Parse JSON", ParDo.of(new ReadObjects())) > > .apply("Extract timestamp", ParDo.of(new > ExtractTimestamps())); > > > > PCollection<DateTime> dates = logLines > > .apply("Get Dates", ParDo.of(new GetDateFunction())) > > .apply("Get distinct Dates", > Distinct.<DateTime>create()); > > > > final PCollectionView<Long> numOfDays = dates.apply("Count", > Count.globally().asSingletonView()); > > > > This ends up in a type mismatch, is Count only applicable to certain > types like String, Long, Integer, TableRow? > > Considering the code this would be counterintuitive as everything > seems to be implemented using generic types in the SDK. > > > > I am still getting used to the Beam Programming Model and it confuses > me still from time to time, sorry. > > > > Is it possible to access the side input outside of a DoFn? > > > > You can access a side input from a DoFn and now also from a > CombineFnWithContext (a bit of an advanced feature). > > > > So getting a single calculated value to partition a PCollection > (number of days in this case), should be done via the extractOutput [0] > > Method in this case? > > > > Best, > > Tobi > > > > [0] > https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context- > > > > > > >
