Hi,

My pipeline is a GCS -> Dataflow -> BigQuery pipeline, processing 2GiB gzipped 
JSON files (about 90 of them).
With the Dataflow SDK I was able to run it in streaming/batch mode on the 
dataflow service for a couple of hours until it either run out of
memory or throw exceptions.

After moving to Apache Beam, the step in which I extract the timestamp 
(ExtractTimestamps) – full code here [0] - and attach it to the element, does 
not produce any output anymore.
Could it be that the implementation changed? This step is the one that enables 
windowing, which should make it possible to write to
Partitions when running in streaming mode. According to the 0.5.0 code
“If invoked from ProcessElement}), the timestamp must not be older than the 
input element's timestamp minus DoFn#getAllowedTimestampSkew. The output 
element will be in the same windows as the input element.”

Could this be the problem?

The pipeline is based on the idea of Dan’s post on StackOverflow [1]. I would 
be fine running it in streaming mode, but it looks like the input data is just 
too much to handle as it crashed in streaming mode (or in batch, but batch is 
experimental with partitions anyway).

I am running out of ideas at the moment. I would love to get an opinion about 
either:

1. Reading all files, parsing and pushing them onto PubSub, then reading from 
PubSub in streaming mode. Handle daily files in batch.

2. Built a giant loop that’s splits the files between different pipelines, 
using something like a semaphore worker pool, processing them in two phases. In 
the first, extract and parse and write and AVRO file, as well as a key-value 
map between dates and filenames. In phase 2, read the AVRO files, while knowing 
the total number of days then and partition them accordingly to load into BQ.


Tobi

p.apply("Read logfile", TextIO.Read.from(bucket))
        .apply("Repartition", Repartition.of())
        .apply("Parse JSON", ParDo.of(new ReadObjects()))
        .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()))
        .apply("Window into days", 
Window.into(FixedWindows.of(Duration.standardDays(1))))
        .apply("Format Output", ParDo.of(new Outputter()))
        .apply("Write to BQ", BigQueryIO.Write
                .to(new DayPartitionFunc("dataset", tableName))
                .withSchema(Outputter.getSchema())
                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));


[0] 
https://gist.github.com/james-woods/2e8e91adf35faddce3814128852896ea#file-pipeline-java-L105
[1] 
http://stackoverflow.com/questions/38114306/creating-writing-to-parititoned-bigquery-table-via-google-cloud-dataflow?rq=1

Reply via email to