Hmm. Since they are different by 1ms I wonder if it is rounding / truncation combined with very slight skew between Pubsub & DF. Just a random guess. Your code does seem reasonable at first glance, from a Beam perspective (it is Scio code, yes?)
Kenn On Tue, May 1, 2018 at 8:00 AM Carlos Alonso <[email protected]> wrote: > Ok, so after checking logs deeper I've found a line that seems to identify > the steps (Adding config for S2: > {"instructions":[{"name":"pubsubSubscriptionWithAttributes@{PubSubDataReader.scala:10}/PubsubUnboundedSource","originalName":"s13","outputs":..), > so that would mean that the exception is thrown from the reading from > PubSub step in which I actually run this code: > > sc.pubsubSubscriptionWithAttributes[String](s"projects/$projectId/subscriptions/$subscription") > .withName("Set timestamps") > .timestampBy(_ => new Instant) > .withName("Apply windowing") > .withFixedWindows(windowSize) > > > I'm setting the elements in the window when they're read because I'm > pairing them with the schemas read from BigQuery using a side transform > later on... Is it possible that the elements already have a (somehow > future) timestamp and this timestampBy transform is causing the issue? > > If that would be the case, the elements read from PubSub would need to > have a "later" timestamp than "now", as, by the exception message, my > transform, that is setting timestamps to "now" would actually be trying to > set them backwards... Does it make any sense? (I'll try to dive into the > read from PubSub transform to double check...) > > Thanks! > > On Tue, May 1, 2018 at 4:44 PM Carlos Alonso <[email protected]> wrote: > >> Hi everyone!! >> >> I have a job that reads heterogeneous messages from PubSub and, depending >> on its type, writes them to the appropriate BigQuery table and I keep >> getting random "java.lang.IllegalArgumentException: Cannot output with >> timestamp" errors that I cannot identify, and I can't even figure out which >> part of the code is actually throwing the Exception by looking at the >> stacktrace... >> >> You can find the full stacktrace here: https://pastebin.com/1gN4ED2A and >> a couple of job ids are this 2018-04-26_08_56_42-10071326408494590980 and >> this: 2018-04-27_09_19_13-15798240702327849959 >> >> Trying to, at least, figure out the source transform of the error, the >> logs says the trace was at stage S2, but I don't know how to identify which >> parts of my pipeline form which stages... >> >> Thanks!! >> >
