Hmm. Since they are different by 1ms I wonder if it is rounding /
truncation combined with very slight skew between Pubsub & DF. Just a
random guess. Your code does seem reasonable at first glance, from a Beam
perspective (it is Scio code, yes?)

Kenn

On Tue, May 1, 2018 at 8:00 AM Carlos Alonso <[email protected]> wrote:

> Ok, so after checking logs deeper I've found a line that seems to identify
> the steps (Adding config for S2:
> {"instructions":[{"name":"pubsubSubscriptionWithAttributes@{PubSubDataReader.scala:10}/PubsubUnboundedSource","originalName":"s13","outputs":..),
> so that would mean that the exception is thrown from the reading from
> PubSub step in which I actually run this code:
>
> sc.pubsubSubscriptionWithAttributes[String](s"projects/$projectId/subscriptions/$subscription")
>   .withName("Set timestamps")
>   .timestampBy(_ => new Instant)
>   .withName("Apply windowing")
>   .withFixedWindows(windowSize)
>
>
> I'm setting the elements in the window when they're read because I'm
> pairing them with the schemas read from BigQuery using a side transform
> later on... Is it possible that the elements already have a (somehow
> future) timestamp and this timestampBy transform is causing the issue?
>
> If that would be the case, the elements read from PubSub would need to
> have a "later" timestamp than "now", as, by the exception message, my
> transform, that is setting timestamps to "now" would actually be trying to
> set them backwards... Does it make any sense? (I'll try to dive into the
> read from PubSub transform to double check...)
>
> Thanks!
>
> On Tue, May 1, 2018 at 4:44 PM Carlos Alonso <[email protected]> wrote:
>
>> Hi everyone!!
>>
>> I have a job that reads heterogeneous messages from PubSub and, depending
>> on its type, writes them to the appropriate BigQuery table and I keep
>> getting random "java.lang.IllegalArgumentException: Cannot output with
>> timestamp" errors that I cannot identify, and I can't even figure out which
>> part of the code is actually throwing the Exception by looking at the
>> stacktrace...
>>
>> You can find the full stacktrace here: https://pastebin.com/1gN4ED2A and
>> a couple of job ids are this 2018-04-26_08_56_42-10071326408494590980 and
>> this: 2018-04-27_09_19_13-15798240702327849959
>>
>> Trying to, at least, figure out the source transform of the error, the
>> logs says the trace was at stage S2, but I don't know how to identify which
>> parts of my pipeline form which stages...
>>
>> Thanks!!
>>
>

Reply via email to