And I think my approach will never work. I just RTFM again and it clearly states:
“You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).” [0] Meaning that I cannot read my data in my pipeline to determine the number of days I want to partition it by to write it to dated tables in BQ, or at least to write AVRO files (one per day) to GCS. I guess I need two pipelines. This was something I wanted to avoid as it is making the thing more complicated, since I need to chain the two and check if all from the first part are successfully done, before I can start phase two. Or am I missing something here? Tobi [0] https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition On 19.02.17, 21:41, "Tobias Feldhaus" <[email protected]<mailto:[email protected]>> wrote: Sorry for the formatting of the last mail, I basically killed the quoting. I think I figured it out partially. I am referring to this example [0] and tried to mimic the behaviour. The code looks like this: Pipeline p = Pipeline.create(options); PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket)) .apply("Repartition", Repartition.of()) .apply("Parse JSON", ParDo.of(new ReadObjects())) .apply("Extract timestamp", ParDo.of(new ExtractTimestamps())); PCollection<TableRow> output = logLines.apply("Format Output", ParDo.of(new Outputter())); output.apply("Write to BigQueryService", BigQueryIO.Write .to("somedataset."+tableName) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); PCollection<KV<Instant, Instant>> dates = logLines .apply("Extract Instant", ParDo.of(new GetDateFunction())) .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() { public Instant apply(Instant instant) { return instant; } })); final PCollectionView<Long> numOfDays = dates .apply("GetDateTimes", Keys.<Instant>create()) .apply("DistinctDocs", Distinct.<Instant>create()) .apply("Count", Count.<Instant>globally()) .apply("View", View.<Long>asSingleton()); // PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here) I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also, I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to Instants (using the TlDf example [1], and WithKeys.of [2]). I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again. Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that “If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew” so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing). My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning? Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of(). Have a nice Sunday, Tobi [0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209 [1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181 [2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]> wrote: Hi Ken, thank you for your answer! On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <[email protected]> wrote: My intention is to get a PCollectionView<Long> numOfDays Which holds the number of elements (days) of a given PCollection<DateTime> dates, and building it via a sum of counts (as a SingletonView). Something that corresponds in my head to *wrong*: dates.apply("Count", Count.globally().asSingletonView()); Seems about right. It seems about right to me too, but apparently I have a misconception about what is happening here in my head: PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket)) .apply("Repartition", Repartition.of()) .apply("Parse JSON", ParDo.of(new ReadObjects())) .apply("Extract timestamp", ParDo.of(new ExtractTimestamps())); PCollection<DateTime> dates = logLines .apply("Get Dates", ParDo.of(new GetDateFunction())) .apply("Get distinct Dates", Distinct.<DateTime>create()); final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView()); This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow? Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK. I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry. Is it possible to access the side input outside of a DoFn? You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature). So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0] Method in this case? Best, Tobi [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-
