According to this [0] Stackoverflow answer, keying by day and grouping by key and then using that to infer the table name is not possible. The only thing left would be using per window tables, which need an attached timestamp. But the TimeStamping seems to have changed as this Approach [1] is not working in the Beam SDK for me anymore.
Is there any way to get gzipped JSON files from GCS through Dataflow into BQ into a partitioned or even date stamped table? [0] http://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey#31165682 [1] https://cloud.google.com/dataflow/model/windowing#TimeStamping On 19.02.17, 23:09, "Tobias Feldhaus" <[email protected]<mailto:[email protected]>> wrote: That would mean attaching a (date) timestamp (which is right now an open question for me, see “Assigning Timestamps” problem) to each element and windowing by day, or building a KV with the date and grouping by the key of the KV? On 19.02.17, 22:24, "Ben Chambers" <[email protected]<mailto:[email protected]>> wrote: Could you instead key by day and then use per key computations and/or groups by key? Often it is easier to compute per key than to partition. The first is a simpler pipeline structure while partition requires more typically duplicated transform nodes. On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus <[email protected]<mailto:[email protected]>> wrote: And I think my approach will never work. I just RTFM again and it clearly states: “You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).” [0] Meaning that I cannot read my data in my pipeline to determine the number of days I want to partition it by to write it to dated tables in BQ, or at least to write AVRO files (one per day) to GCS. I guess I need two pipelines. This was something I wanted to avoid as it is making the thing more complicated, since I need to chain the two and check if all from the first part are successfully done, before I can start phase two. Or am I missing something here? Tobi [0] https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition On 19.02.17, 21:41, "Tobias Feldhaus" <[email protected]<mailto:[email protected]>> wrote: Sorry for the formatting of the last mail, I basically killed the quoting. I think I figured it out partially. I am referring to this example [0] and tried to mimic the behaviour. The code looks like this: Pipeline p = Pipeline.create(options); PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket)) .apply("Repartition", Repartition.of()) .apply("Parse JSON", ParDo.of(new ReadObjects())) .apply("Extract timestamp", ParDo.of(new ExtractTimestamps())); PCollection<TableRow> output = logLines.apply("Format Output", ParDo.of(new Outputter())); output.apply("Write to BigQueryService", BigQueryIO.Write .to("somedataset."+tableName) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); PCollection<KV<Instant, Instant>> dates = logLines .apply("Extract Instant", ParDo.of(new GetDateFunction())) .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() { public Instant apply(Instant instant) { return instant; } })); final PCollectionView<Long> numOfDays = dates .apply("GetDateTimes", Keys.<Instant>create()) .apply("DistinctDocs", Distinct.<Instant>create()) .apply("Count", Count.<Instant>globally()) .apply("View", View.<Long>asSingleton()); // PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here) I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also, I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to Instants (using the TlDf example [1], and WithKeys.of [2]). I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again. Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that “If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew” so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing). My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning? Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of(). Have a nice Sunday, Tobi [0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209 [1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181 [2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]<mailto:[email protected]>> wrote: Hi Ken, thank you for your answer! On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <[email protected]<mailto:[email protected]>> wrote: My intention is to get a PCollectionView<Long> numOfDays Which holds the number of elements (days) of a given PCollection<DateTime> dates, and building it via a sum of counts (as a SingletonView). Something that corresponds in my head to *wrong*: dates.apply("Count", Count.globally().asSingletonView()); Seems about right. It seems about right to me too, but apparently I have a misconception about what is happening here in my head: PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket)) .apply("Repartition", Repartition.of()) .apply("Parse JSON", ParDo.of(new ReadObjects())) .apply("Extract timestamp", ParDo.of(new ExtractTimestamps())); PCollection<DateTime> dates = logLines .apply("Get Dates", ParDo.of(new GetDateFunction())) .apply("Get distinct Dates", Distinct.<DateTime>create()); final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView()); This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow? Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK. I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry. Is it possible to access the side input outside of a DoFn? You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature). So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0] Method in this case? Best, Tobi [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-
