Solved this also, basically its just two jobs that are executed independently.
To see when the batch one
terminates you can do
job = DataflowPipelineRunner.fromOptions(options).run(p);
run them like this. The state of the job can then be polled, or waitToFinish()
can be called.
After the first one terminated, after a fixed time period (nicer would be
examining the System Lag of the Pub/Sub.IO),
the second one can be drained like described here [0]. The open question is how
to get the lag from the API.
However, the current solution is costlier, since it is using Pub/Sub as a
message queue, which has some impact
when considering the total uncompressed size is more than 1TB.
[0] http://stackoverflow.com/a/41662247/5497956
On 20 Feb 2017, at 22:16, Tobias Feldhaus
<[email protected]<mailto:[email protected]>> wrote:
I solved it via option 1, using a Batch that is reading all files and putting
them onto PubSub,
then using a pipeline in Streaming Mode that is reading from PubSub and
inserting into BigQuery.
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Parse JSON", ParDo.of(new ReadObjects()))
.apply("Extract timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Format Output", ParDo.of(new Outputter()))
.apply("WriteToPubSub", PubsubIO.Write
.topic(topic)
.withCoder(TableRowJsonCoder.of())
.timestampLabel("ts"));
and the second one:
p.apply("Read from PubSub",
PubsubIO.Read.timestampLabel("ts").topic(topic).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardDays(1))))
.apply("Write to BigQueryService", BigQueryIO.Write
.to(new DayPartitionFunc(“somedataset", tableName))
.withSchema(Outputter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
However, I was not able to implement this with the new version of Apache Beam
0.5.0, but the Dataflow SDK,
as the “outputWithTimestamp” problem persists. (I will look into that further.)
I was thinking about injecting a “poison” from the batch producer side to make
the consumer streaming side drain itself,
but for that I would need to
1. Inject poison
2. If poison is received on the other side, make sure that the poison is send
out again by the one receiving process
that shuts itself down gracefully. There needs to be a finite (and upfront
known) number of processes that know by counting
down which is the last one that keeps the poison for himself and quits.
The problem I see is that messages are not guaranteed to be ordered, but as the
sending side is injecting it as the last message,
could something like this work?
Is there some way offered by the framework to gracefully (drain) a pipeline by
signalling through code?
Tobi
On 20 Feb 2017, at 10:13, Tobias Feldhaus
<[email protected]<mailto:[email protected]>> wrote:
Hi,
My pipeline is a GCS -> Dataflow -> BigQuery pipeline, processing 2GiB gzipped
JSON files (about 90 of them).
With the Dataflow SDK I was able to run it in streaming/batch mode on the
dataflow service for a couple of hours until it either run out of
memory or throw exceptions.
After moving to Apache Beam, the step in which I extract the timestamp
(ExtractTimestamps) – full code here [0] - and attach it to the element, does
not produce any output anymore.
Could it be that the implementation changed? This step is the one that enables
windowing, which should make it possible to write to
Partitions when running in streaming mode. According to the 0.5.0 code
“If invoked from ProcessElement}), the timestamp must not be older than the
input element's timestamp minus DoFn#getAllowedTimestampSkew. The output
element will be in the same windows as the input element.”
Could this be the problem?
The pipeline is based on the idea of Dan’s post on StackOverflow [1]. I would
be fine running it in streaming mode, but it looks like the input data is just
too much to handle as it crashed in streaming mode (or in batch, but batch is
experimental with partitions anyway).
I am running out of ideas at the moment. I would love to get an opinion about
either:
1. Reading all files, parsing and pushing them onto PubSub, then reading from
PubSub in streaming mode. Handle daily files in batch.
2. Built a giant loop that’s splits the files between different pipelines,
using something like a semaphore worker pool, processing them in two phases. In
the first, extract and parse and write and AVRO file, as well as a key-value
map between dates and filenames. In phase 2, read the AVRO files, while knowing
the total number of days then and partition them accordingly to load into BQ.
Tobi
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new ReadObjects()))
.apply("Extract timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Window into days",
Window.into(FixedWindows.of(Duration.standardDays(1))))
.apply("Format Output", ParDo.of(new Outputter()))
.apply("Write to BQ", BigQueryIO.Write
.to(new DayPartitionFunc("dataset", tableName))
.withSchema(Outputter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
[0]
https://gist.github.com/james-woods/2e8e91adf35faddce3814128852896ea#file-pipeline-java-L105
[1]
http://stackoverflow.com/questions/38114306/creating-writing-to-parititoned-bigquery-table-via-google-cloud-dataflow?rq=1