Hey Tobi;

If you try to call outputWithTimestamp and the timestamp is before the
current timestamp of the element, the PipelineRunner is supposed to refuse
to accept the input by throwing an exception. However, this doesn't sound
like the problem you're experiencing, especially given that when I look at
the Pipeline you're reading from TextIO, which starts with the minimum
representable timestamp.

Looking at the code of the DoFn that you linked, it's line 131[0] that I'm
most suspect of - namely, the fact that elements are dropped from the
output if the timestamp cannot be parsed, and moreso, on line 137 - if
there's an IllegalArgumentException, the element is silently dropped
(without any logging) - if you propagate an exception when these cases
occur, how does the Pipeline behave? Ideally there would be some way to
ensure that elements that are not understood by your timestamp parsing
functions are made clearly visible - e.g. by writing the invalid records to
a "skipped record" file/table/etc.

Thanks,

Thomas

[0]
https://gist.github.com/james-woods/2e8e91adf35faddce3814128852896ea#file-pipeline-java-L132

On Tue, Feb 21, 2017 at 3:43 PM, Tobias Feldhaus <
[email protected]> wrote:

> Solved this also, basically its just two jobs that are executed
> independently. To see when the batch one
> terminates you can do
>
>         job = DataflowPipelineRunner.fromOptions(options).run(p);
>
> run them like this. The state of the job can then be polled, or
> waitToFinish() can be called.
> After the first one terminated, after a fixed time period (nicer would be
> examining the System Lag of the Pub/Sub.IO),
> the second one can be drained like described here [0]. The open question
> is how to get the lag from the API.
>
> However, the current solution is costlier, since it is using Pub/Sub as a
> message queue, which has some impact
> when considering the total uncompressed size is more than 1TB.
>
> [0] http://stackoverflow.com/a/41662247/5497956
>
> On 20 Feb 2017, at 22:16, Tobias Feldhaus <[email protected]>
> wrote:
>
> I solved it via option 1, using a Batch that is reading all files and
> putting them onto PubSub,
> then using a pipeline in Streaming Mode that is reading from PubSub and
> inserting into BigQuery.
>
>         p.apply("Read logfile", TextIO.Read.from(bucket))
>                 .apply("Parse JSON", ParDo.of(new ReadObjects()))
>                 .apply("Extract timestamp", ParDo.of(new
> ExtractTimestamps()))
>                 .apply("Format Output", ParDo.of(new Outputter()))
>                 .apply("WriteToPubSub", PubsubIO.Write
>                         .topic(topic)
>                         .withCoder(TableRowJsonCoder.of())
>                         .timestampLabel("ts"));
>
> and the second one:
>
>     p.apply("Read from PubSub", PubsubIO.Read.timestampLabel("
> ts").topic(topic).withCoder(TableRowJsonCoder.of()))
>                 .apply(Window.<TableRow>into(FixedWindows.of(Duration.
> standardDays(1))))
>                 .apply("Write to BigQueryService", BigQueryIO.Write
>                         .to(new DayPartitionFunc(“somedataset",
> tableName))
>                         .withSchema(Outputter.getSchema())
>                         .withCreateDisposition(BigQueryIO.Write.
> CreateDisposition.CREATE_IF_NEEDED)
>                         .withWriteDisposition(BigQueryIO.Write.
> WriteDisposition.WRITE_APPEND));
>
> However, I was not able to implement this with the new version of Apache
> Beam 0.5.0, but the Dataflow SDK,
> as the “outputWithTimestamp” problem persists. (I will look into that
> further.)
>
> I was thinking about injecting a “poison” from the batch producer side to
> make the consumer streaming side drain itself,
> but for that I would need to
>
> 1. Inject poison
> 2. If poison is received on the other side, make sure that the poison is
> send out again by the one receiving process
> that shuts itself down gracefully. There needs to be a finite (and upfront
> known) number of processes that know by counting
> down which is the last one that keeps the poison for himself and quits.
>
> The problem I see is that messages are not guaranteed to be ordered, but
> as the sending side is injecting it as the last message,
> could something like this work?
>
> Is there some way offered by the framework to gracefully (drain) a
> pipeline by signalling through code?
>
> Tobi
>
> On 20 Feb 2017, at 10:13, Tobias Feldhaus <[email protected]>
> wrote:
>
> Hi,
>
> My pipeline is a GCS -> Dataflow -> BigQuery pipeline, processing 2GiB
> gzipped JSON files (about 90 of them).
> With the Dataflow SDK I was able to run it in streaming/batch mode on the
> dataflow service for a couple of hours until it either run out of
> memory or throw exceptions.
>
> After moving to Apache Beam, the step in which I extract the timestamp
> (ExtractTimestamps) – full code here [0] - and attach it to the element,
> does not produce any output anymore.
> Could it be that the implementation changed? This step is the one that
> enables windowing, which should make it possible to write to
> Partitions when running in streaming mode. According to the 0.5.0 code
> “*If invoked from ProcessElement}), the timestamp must not be older than
> the input element's timestamp minus DoFn#getAllowedTimestampSkew. The
> output element will be in the same windows as the input element.”*
>
> Could this be the problem?
>
> The pipeline is based on the idea of Dan’s post on StackOverflow [1]. I
> would be fine running it in streaming mode, but it looks like the input
> data is just too much to handle as it crashed in streaming mode (or in
> batch, but batch is experimental with partitions anyway).
>
> I am running out of ideas at the moment. I would love to get an opinion
> about either:
>
> 1. Reading all files, parsing and pushing them onto PubSub, then reading
> from PubSub in streaming mode. Handle daily files in batch.
>
> 2. Built a giant loop that’s splits the files between different pipelines,
> using something like a semaphore worker pool, processing them in two
> phases. In the first, extract and parse and write and AVRO file, as well as
> a key-value map between dates and filenames. In phase 2, read the AVRO
> files, while knowing the total number of days then and partition them
> accordingly to load into BQ.
>
>
> Tobi
>
> p.apply("Read logfile", TextIO.Read.*from*(*bucket*))
>         .apply("Repartition", Repartition.*of*())
>         .apply("Parse JSON", ParDo.*of*(new ReadObjects()))
>         .apply("Extract timestamp", ParDo.*of*(new ExtractTimestamps()))
>         .apply("Window into days", Window.*into*(FixedWindows.*of*(
> Duration.*standardDays*(1))))
>         .apply("Format Output", ParDo.*of*(new Outputter()))
>         .apply("Write to BQ", BigQueryIO.Write
>                 .*to*(new DayPartitionFunc("dataset", *tableName*))
>                 .withSchema(Outputter.*getSchema*())
>                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.
> *CREATE_IF_NEEDED*)
>                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.
> *WRITE_APPEND*));
>
>
> [0] https://gist.github.com/james-woods/2e8e91adf35faddce3814128852896
> ea#file-pipeline-java-L105
> [1] http://stackoverflow.com/questions/38114306/creating-
> writing-to-parititoned-bigquery-table-via-google-cloud-dataflow?rq=1
>
>
>
>

Reply via email to