Hi Chengzhi,

Are you simply trying to emit the timestamp onward? Why not just use
`out.output` with an PCollection<Instant>?

static class ReadWithEventTime extends DoFn<String, String> {
    @DoFn.ProcessElement
    public void processElement(@Element String line,
OutputReceiver<Instant> out){
        out.output(new Instant(Long.parseLong(line)));
    }
}

You can also output the line itself as a PCollection<String>. If you line
has additional information to parse, consider a KeyValue Pair
https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
where
you can emit both some parsed context of the string and the timestamp.

The probable reason why outputWithTimestamp doesn't work with older times
is that the timestamp emitted is used specifically for windowing and for
streaming type Data pipelines to determine which window each record belongs
for aggregations.

-Theo


On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <[email protected]>
wrote:

> Hi folks,
>
> I am new to Beam and try to play with some example, I am running Beam 2.14
> with Direct runner to read some files (I continue generated).
>
> I am facing this error: Cannot output with timestamp
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
> skew (0 milliseconds). I searched online but still don't quite understand
> it so I am asking here for some help.
>
> A file has some past timestamp in it:
> 1565958615120
> 1565958615120
> 1565958615121
>
> My code looks something like this:
>
>    static class ReadWithEventTime extends DoFn<String, String> {
>     @ProcessElement
>     public void processElement(@Element String line, OutputReceiver<String> 
> out){
>         out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>     }
> }
>
>     public static void main(String[] args) {
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline pipeline = Pipeline.create(options);
>
>         String sourcePath = new File("files/").getPath();
>
>         PCollection<String> data = pipeline.apply("ReadData",
>                 TextIO.read().from(sourcePath + "/test*")
>                         .watchForNewFiles(Duration.standardSeconds(5), 
> Watch.Growth.<String>never()));
>
>         data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
>
>         pipeline.run().waitUntilFinish();
>
>     }
>
>
> I am trying to understand in the error message where "current input
> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
> when I start my application? If that's the case, is there a way that I can
> change the initial watermark?
>
> Also, I can setup `withAllowedTimestampSkew` but it looks like it has
> been deprecated.
>
> Any suggestion would be appreciated. Thank you!
>
> Best,
> Chengzhi
>
>

Reply via email to