And I think my approach will never work. I just RTFM again and it clearly 
states:

“You can, for example, pass the number of partitions as a command-line option 
at runtime
(which will then be used to build your pipeline graph), but you cannot 
determine the number
of partitions in mid-pipeline (based on data calculated after your pipeline 
graph is constructed, for instance).” [0]

Meaning that I cannot read my data in my pipeline to determine the number of 
days I want to partition it by to
write it to dated tables in BQ, or at least to write AVRO files (one per day) 
to GCS.

I guess I need two pipelines. This was something I wanted to avoid as it is 
making the thing more complicated, since
I need to chain the two and check if all from the first part are successfully 
done, before I can start phase two.

Or am I missing something here?

Tobi

[0] 
https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition

On 19.02.17, 21:41, "Tobias Feldhaus" 
<[email protected]<mailto:[email protected]>> wrote:


Sorry for the formatting of the last mail, I basically killed the quoting.

I think I figured it out partially.

I am referring to this example [0] and tried to mimic the behaviour.

The code looks like this:



        Pipeline p = Pipeline.create(options);



        PCollection<ItLogLine> logLines = p.apply("Read logfile", 
TextIO.Read.from(bucket))

                .apply("Repartition", Repartition.of())

                .apply("Parse JSON", ParDo.of(new ReadObjects()))

                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



        PCollection<TableRow> output  = logLines.apply("Format Output", 
ParDo.of(new Outputter()));



        output.apply("Write to BigQueryService", BigQueryIO.Write

                .to("somedataset."+tableName)

                .withSchema(schema)

                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));



        PCollection<KV<Instant, Instant>> dates = logLines

                .apply("Extract Instant", ParDo.of(new GetDateFunction()))

                .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() 
{

                    public Instant apply(Instant instant) { return instant; } 
}));



        final PCollectionView<Long> numOfDays =

                dates

                        .apply("GetDateTimes", Keys.<Instant>create())

                        .apply("DistinctDocs", Distinct.<Instant>create())

                        .apply("Count", Count.<Instant>globally())

                        .apply("View", View.<Long>asSingleton());



//       PCollectionList pCollectionPerDay = logLines.apply("Partition per 
Day", Partition.of ( I need to know the number of days here)



I was using Integer before in the PCollectionView and that was the issue, but 
it works with Long. Also,

I was using DateTime, which does not come with a DefaultCoder predefined, so I 
simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).



I have two big open questions. In my code I was using 
c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed

a string from a ItLogLine field into an instant and attached it to the element 
to use it later in the chain again.



Now, after switching to Beam from Dataflow, this stopped working. I assume that 
I was using it wrongly, since it says that



“If invoked from ProcessElement, the timestamp must not be older than the input 
element's timestamp minus DoFn#getAllowedTimestampSkew”



so I want to change my code to do the right thing - however, in the current 
documentation I cannot find the text that describes how to

assign timestamps, as the link is broken (“see Assigning 
Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> 
for more information on how to do so.” – the link goes to #Windowing).

My second question is about the PCollectionView – Dan said that it’s possible 
to use it as a SideInput, but is it also possible to use a sideinput when doing 
partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to 
the Partition.of().





Have a nice Sunday,
Tobi



[0] 
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209

[1] 
https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181

[2] 
https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys



On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]> wrote:



    Hi Ken,



    thank you for your answer!







    On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus 
<[email protected]> wrote:



    My intention is to get a



            PCollectionView<Long> numOfDays



    Which holds the number of elements (days) of a given PCollection<DateTime> 
dates,

    and building it via a sum of counts (as a SingletonView).



    Something that corresponds in my head to *wrong*:



            dates.apply("Count", Count.globally().asSingletonView());



    Seems about right.



    It seems about right to me too, but apparently I have a misconception about 
what is happening here in my head:



            PCollection<ItLogLine> logLines = p.apply("Read logfile", 
TextIO.Read.from(bucket))

                    .apply("Repartition", Repartition.of())

                    .apply("Parse JSON", ParDo.of(new ReadObjects()))

                    .apply("Extract timestamp", ParDo.of(new 
ExtractTimestamps()));



            PCollection<DateTime> dates = logLines

                    .apply("Get Dates", ParDo.of(new GetDateFunction()))

                    .apply("Get distinct Dates", Distinct.<DateTime>create());



            final PCollectionView<Long> numOfDays = dates.apply("Count", 
Count.globally().asSingletonView());



    This ends up in a type mismatch, is Count only applicable to certain types 
like String, Long, Integer, TableRow?

    Considering the code this would be counterintuitive as everything seems to 
be implemented using generic types in the SDK.



    I am still getting used to the Beam Programming Model and it confuses me 
still from time to time, sorry.



    Is it possible to access the side input outside of a DoFn?



    You can access a side input from a DoFn and now also from a 
CombineFnWithContext (a bit of an advanced feature).



    So getting a single calculated value to partition a PCollection (number of 
days in this case), should be done via the extractOutput [0]

    Method in this case?



    Best,

    Tobi



    [0] 
https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-






Reply via email to