Sorry for the formatting of the last mail, I basically killed the quoting.
I think I figured it out partially.
I am referring to this example [0] and tried to mimic the behaviour.
The code looks like this:
Pipeline p = Pipeline.create(options);
PCollection<ItLogLine> logLines = p.apply("Read logfile",
TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new ReadObjects()))
.apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));
PCollection<TableRow> output = logLines.apply("Format Output",
ParDo.of(new Outputter()));
output.apply("Write to BigQueryService", BigQueryIO.Write
.to("somedataset."+tableName)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PCollection<KV<Instant, Instant>> dates = logLines
.apply("Extract Instant", ParDo.of(new GetDateFunction()))
.apply(WithKeys.of(new SerializableFunction<Instant, Instant>()
{
public Instant apply(Instant instant) { return instant; }
}));
final PCollectionView<Long> numOfDays =
dates
.apply("GetDateTimes", Keys.<Instant>create())
.apply("DistinctDocs", Distinct.<Instant>create())
.apply("Count", Count.<Instant>globally())
.apply("View", View.<Long>asSingleton());
// PCollectionList pCollectionPerDay = logLines.apply("Partition per
Day", Partition.of ( I need to know the number of days here)
I was using Integer before in the PCollectionView and that was the issue, but
it works with Long. Also,
I was using DateTime, which does not come with a DefaultCoder predefined, so I
simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).
I have two big open questions. In my code I was using
c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed
a string from a ItLogLine field into an instant and attached it to the element
to use it later in the chain again.
Now, after switching to Beam from Dataflow, this stopped working. I assume that
I was using it wrongly, since it says that
“If invoked from ProcessElement, the timestamp must not be older than the input
element's timestamp minus DoFn#getAllowedTimestampSkew”
so I want to change my code to do the right thing - however, in the current
documentation I cannot find the text that describes how to
assign timestamps, as the link is broken (“see Assigning
Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing>
for more information on how to do so.” – the link goes to #Windowing).
My second question is about the PCollectionView – Dan said that it’s possible
to use it as a SideInput, but is it also possible to use a sideinput when doing
partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to
the Partition.of().
Have a nice Sunday,
Tobi
[0]
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209
[1]
https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181
[2]
https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys
On 19.02.17, 14:45, "Tobias Feldhaus" <[email protected]> wrote:
Hi Ken,
thank you for your answer!
On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus
<[email protected]> wrote:
My intention is to get a
PCollectionView<Long> numOfDays
Which holds the number of elements (days) of a given PCollection<DateTime>
dates,
and building it via a sum of counts (as a SingletonView).
Something that corresponds in my head to *wrong*:
dates.apply("Count", Count.globally().asSingletonView());
Seems about right.
It seems about right to me too, but apparently I have a misconception about
what is happening here in my head:
PCollection<ItLogLine> logLines = p.apply("Read logfile",
TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new ReadObjects()))
.apply("Extract timestamp", ParDo.of(new
ExtractTimestamps()));
PCollection<DateTime> dates = logLines
.apply("Get Dates", ParDo.of(new GetDateFunction()))
.apply("Get distinct Dates", Distinct.<DateTime>create());
final PCollectionView<Long> numOfDays = dates.apply("Count",
Count.globally().asSingletonView());
This ends up in a type mismatch, is Count only applicable to certain types
like String, Long, Integer, TableRow?
Considering the code this would be counterintuitive as everything seems to
be implemented using generic types in the SDK.
I am still getting used to the Beam Programming Model and it confuses me
still from time to time, sorry.
Is it possible to access the side input outside of a DoFn?
You can access a side input from a DoFn and now also from a
CombineFnWithContext (a bit of an advanced feature).
So getting a single calculated value to partition a PCollection (number of
days in this case), should be done via the extractOutput [0]
Method in this case?
Best,
Tobi
[0]
https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-