Could you instead key by day and then use per key computations and/or
groups by key? Often it is easier to compute per key than to partition. The
first is a simpler pipeline structure while partition requires more
typically duplicated transform nodes.

On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus <
[email protected]> wrote:

> And I think my approach will never work. I just RTFM again and it clearly
> states:
>
> “You can, for example, pass the number of partitions as a command-line
> option at runtime
>
> (which will then be used to build your pipeline graph), but you cannot
> determine the number
>
> of partitions in mid-pipeline (based on data calculated after your
> pipeline graph is constructed, for instance).” [0]
>
>
>
> Meaning that I cannot read my data in my pipeline to determine the number
> of days I want to partition it by to
>
> write it to dated tables in BQ, or at least to write AVRO files (one per
> day) to GCS.
>
>
>
> I guess I need two pipelines. This was something I wanted to avoid as it
> is making the thing more complicated, since
>
> I need to chain the two and check if all from the first part are
> successfully done, before I can start phase two.
>
>
>
> Or am I missing something here?
>
> Tobi
>
>
>
> [0]
> https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition
>
>
>
> On 19.02.17, 21:41, "Tobias Feldhaus" <[email protected]>
> wrote:
>
>
>
> Sorry for the formatting of the last mail, I basically killed the quoting.
>
> I think I figured it out partially.
>
> I am referring to this example [0] and tried to mimic the behaviour.
>
> The code looks like this:
>
>
>
>         Pipeline p = Pipeline.create(options);
>
>
>
>         PCollection<ItLogLine> logLines = p.apply("Read logfile",
> TextIO.Read.from(bucket))
>
>                 .apply("Repartition", Repartition.of())
>
>                 .apply("Parse JSON", ParDo.of(new ReadObjects()))
>
>                 .apply("Extract timestamp", ParDo.of(new
> ExtractTimestamps()));
>
>
>
>         PCollection<TableRow> output  = logLines.apply("Format Output",
> ParDo.of(new Outputter()));
>
>
>
>         output.apply("Write to BigQueryService", BigQueryIO.Write
>
>                 .to("somedataset."+tableName)
>
>                 .withSchema(schema)
>
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
>
>
>         PCollection<KV<Instant, Instant>> dates = logLines
>
>                 .apply("Extract Instant", ParDo.of(new GetDateFunction()))
>
>                 .apply(WithKeys.of(new SerializableFunction<Instant,
> Instant>() {
>
>                     public Instant apply(Instant instant) { return
> instant; } }));
>
>
>
>         final PCollectionView<Long> numOfDays =
>
>                 dates
>
>                         .apply("GetDateTimes", Keys.<Instant>create())
>
>                         .apply("DistinctDocs", Distinct.<Instant>create())
>
>                         .apply("Count", Count.<Instant>globally())
>
>                         .apply("View", View.<Long>asSingleton());
>
>
>
> //       PCollectionList pCollectionPerDay = logLines.apply("Partition per
> Day", Partition.of ( I need to know the number of days here)
>
>
>
> I was using Integer before in the PCollectionView and that was the issue,
> but it works with Long. Also,
>
> I was using DateTime, which does not come with a DefaultCoder predefined,
> so I simply switched to
> Instants (using the TlDf example [1], and WithKeys.of [2]).
>
>
>
> I have two big open questions. In my code I was using
> c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I
> parsed
>
> a string from a ItLogLine field into an instant and attached it to the
> element to use it later in the chain again.
>
>
>
> Now, after switching to Beam from Dataflow, this stopped working. I assume
> that I was using it wrongly, since it says that
>
>
>
> “*If invoked from ProcessElement, the timestamp must not be older than
> the input element's timestamp minus DoFn#getAllowedTimestampSkew”*
>
>
>
> so I want to change my code to do the right thing - however, in the
> current documentation I cannot find the text that describes how to
>
> assign timestamps, as the link is broken (“see Assigning Timestamps
> <https://beam.apache.org/documentation/programming-guide/#windowing> for
> more information on how to do so.” – the link goes to #Windowing).
>
> My second question is about the PCollectionView – Dan said that it’s
> possible to use it as a SideInput, but is it also possible to use a
> sideinput when doing partitioning?
> Intuitively I would like to get a single Long (numOfDays) and then use
> that to the Partition.of().
>
>
>
>
>
> Have a nice Sunday,
> Tobi
>
>
>
> [0]
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209
>
> [1]
> https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181
>
> [2]
> https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys
>
>
>
> On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]>
> wrote:
>
>
>
>     Hi Ken,
>
>
>
>     thank you for your answer!
>
>
>
>
>
>
>
>     On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <
> [email protected]> wrote:
>
>
>
>     My intention is to get a
>
>
>
>             PCollectionView<Long> numOfDays
>
>
>
>     Which holds the number of elements (days) of a given
> PCollection<DateTime> dates,
>
>     and building it via a sum of counts (as a SingletonView).
>
>
>
>     Something that corresponds in my head to *wrong*:
>
>
>
>             dates.apply("Count", Count.globally().asSingletonView());
>
>
>
>     Seems about right.
>
>
>
>     It seems about right to me too, but apparently I have a misconception
> about what is happening here in my head:
>
>
>
>             PCollection<ItLogLine> logLines = p.apply("Read logfile",
> TextIO.Read.from(bucket))
>
>                     .apply("Repartition", Repartition.of())
>
>                     .apply("Parse JSON", ParDo.of(new ReadObjects()))
>
>                     .apply("Extract timestamp", ParDo.of(new
> ExtractTimestamps()));
>
>
>
>             PCollection<DateTime> dates = logLines
>
>                     .apply("Get Dates", ParDo.of(new GetDateFunction()))
>
>                     .apply("Get distinct Dates",
> Distinct.<DateTime>create());
>
>
>
>             final PCollectionView<Long> numOfDays = dates.apply("Count",
> Count.globally().asSingletonView());
>
>
>
>     This ends up in a type mismatch, is Count only applicable to certain
> types like String, Long, Integer, TableRow?
>
>     Considering the code this would be counterintuitive as everything
> seems to be implemented using generic types in the SDK.
>
>
>
>     I am still getting used to the Beam Programming Model and it confuses
> me still from time to time, sorry.
>
>
>
>     Is it possible to access the side input outside of a DoFn?
>
>
>
>     You can access a side input from a DoFn and now also from a
> CombineFnWithContext (a bit of an advanced feature).
>
>
>
>     So getting a single calculated value to partition a PCollection
> (number of days in this case), should be done via the extractOutput [0]
>
>     Method in this case?
>
>
>
>     Best,
>
>     Tobi
>
>
>
>     [0]
> https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-
>
>
>
>
>
>
>

Reply via email to