Hi Theodore, Thanks for your reply. This is just a simple example that I tried to understand how event time works in Beam. I could have more fields and I would have an event time for each of record, so I tried to let Beam know which filed is the event time to use for later windowing and computation.
I think we you mentioned the probable reason sounds reasonable, I am still trying to figure out in the error message "current input (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. Thanks a lot for your help. -- Chengzhi On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]> wrote: > Hi Chengzhi, > > Are you simply trying to emit the timestamp onward? Why not just use > `out.output` with an PCollection<Instant>? > > static class ReadWithEventTime extends DoFn<String, String> { > @DoFn.ProcessElement > public void processElement(@Element String line, OutputReceiver<Instant> > out){ > out.output(new Instant(Long.parseLong(line))); > } > } > > You can also output the line itself as a PCollection<String>. If you line > has additional information to parse, consider a KeyValue Pair > https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html > where > you can emit both some parsed context of the string and the timestamp. > > The probable reason why outputWithTimestamp doesn't work with older times > is that the timestamp emitted is used specifically for windowing and for > streaming type Data pipelines to determine which window each record belongs > for aggregations. > > -Theo > > > On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <[email protected]> > wrote: > >> Hi folks, >> >> I am new to Beam and try to play with some example, I am running Beam >> 2.14 with Direct runner to read some files (I continue generated). >> >> I am facing this error: Cannot output with timestamp >> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the >> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed >> skew (0 milliseconds). I searched online but still don't quite understand >> it so I am asking here for some help. >> >> A file has some past timestamp in it: >> 1565958615120 >> 1565958615120 >> 1565958615121 >> >> My code looks something like this: >> >> static class ReadWithEventTime extends DoFn<String, String> { >> @ProcessElement >> public void processElement(@Element String line, OutputReceiver<String> >> out){ >> out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); >> } >> } >> >> public static void main(String[] args) { >> PipelineOptions options = PipelineOptionsFactory.create(); >> Pipeline pipeline = Pipeline.create(options); >> >> String sourcePath = new File("files/").getPath(); >> >> PCollection<String> data = pipeline.apply("ReadData", >> TextIO.read().from(sourcePath + "/test*") >> .watchForNewFiles(Duration.standardSeconds(5), >> Watch.Growth.<String>never())); >> >> data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime())); >> >> pipeline.run().waitUntilFinish(); >> >> } >> >> >> I am trying to understand in the error message where "current input >> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark >> when I start my application? If that's the case, is there a way that I can >> change the initial watermark? >> >> Also, I can setup `withAllowedTimestampSkew` but it looks like it has >> been deprecated. >> >> Any suggestion would be appreciated. Thank you! >> >> Best, >> Chengzhi >> >> >
