Hi, I need to process messages/events from google pubsub, the message is sent as JSON payload and contains an json attribute say "time" with the timestamp value of the event.
I need to group the events into 5 minute windows and write them to files, one file per window. After I extract the timestamp and set it in the pipeline I get an exception message: java.lang.IllegalArgumentException: Cannot output with timestamp 2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than the timestamp of the current input or timer (2023-05-25T16:40:00.039Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew. Is there a way to solve this problem? How can I override the timestamp of the event without having this issue ? Follows an example of code of the pipeline: import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; public class PubsubTimestampExample { public static void main(String[] args) { // Create the pipeline options PipelineOptions options = PipelineOptionsFactory.create(); // Create the pipeline Pipeline pipeline = Pipeline.create(options); // Define the Pub/Sub topic and subscription String topic = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>"; // Read the messages from Pub/Sub with a timestamp attribute PCollection<String> messages = pipeline .apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic(topic) ); // Process the messages and set the timestamp PCollection<String> processedMessages = messages .apply("SetTimestamp", ParDo.of(new SetTimestampFn())); // Print the processed messages processedMessages.apply("PrintMessages", ParDo.of(new PrintMessagesFn())); // Run the pipeline pipeline.run(); } public static class SetTimestampFn extends DoFn<String, String> { private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); @ProcessElement public void processElement(ProcessContext c) { String message = c.element(); String[] parts = message.split(","); // Assuming message format: "payload,timestamp" String payload = parts[0]; String timestampString = parts[1]; // Extract and parse the timestamp from the payload Instant timestamp = Instant.parse(timestampString, TIMESTAMP_FORMATTER); // Set the timestamp for the element c.outputWithTimestamp(payload, timestamp); } } public static class PrintMessagesFn extends DoFn<String, Void> { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } } } -- Mario Costa Data Analytics Senior Software Developer <https://corp.frvr.com> <https://www.facebook.com/frvrgames> <https://pt.linkedin.com/company/frvr> <https://frvr.com/>