Hi,

I need to process messages/events from google pubsub, the message is sent
as JSON payload and contains an json attribute say "time" with the
timestamp value of the event.

I need to group the events into 5 minute windows and write them to files,
one file per window.

After I extract the timestamp and set it in the pipeline I get an exception
message:

java.lang.IllegalArgumentException: Cannot output with timestamp
2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than the
timestamp of the current input or timer (2023-05-25T16:40:00.039Z) minus
the allowed skew (0 milliseconds) and no later than
294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc
for details on changing the allowed skew.

Is there a way to solve this problem?

How can I override the timestamp of the event without having this issue ?

Follows an example of code of the pipeline:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class PubsubTimestampExample {
    public static void main(String[] args) {
        // Create the pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();

        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Define the Pub/Sub topic and subscription
        String topic = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>";

        // Read the messages from Pub/Sub with a timestamp attribute
        PCollection<String> messages = pipeline
                .apply("ReadFromPubsub",
PubsubIO.readStrings().fromTopic(topic)
        );

        // Process the messages and set the timestamp
        PCollection<String> processedMessages = messages
                .apply("SetTimestamp", ParDo.of(new SetTimestampFn()));

        // Print the processed messages
        processedMessages.apply("PrintMessages", ParDo.of(new
PrintMessagesFn()));

        // Run the pipeline
        pipeline.run();
    }

    public static class SetTimestampFn extends DoFn<String, String> {
        private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");

        @ProcessElement
        public void processElement(ProcessContext c) {
            String message = c.element();
            String[] parts = message.split(",");  // Assuming message
format: "payload,timestamp"
            String payload = parts[0];
            String timestampString = parts[1];

            // Extract and parse the timestamp from the payload
            Instant timestamp = Instant.parse(timestampString,
TIMESTAMP_FORMATTER);

            // Set the timestamp for the element
            c.outputWithTimestamp(payload, timestamp);
        }
    }

    public static class PrintMessagesFn extends DoFn<String, Void> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println(c.element());
        }
    }
}
--

Mario Costa
Data Analytics Senior Software Developer



<https://corp.frvr.com>   <https://www.facebook.com/frvrgames>
<https://pt.linkedin.com/company/frvr>




<https://frvr.com/>

Reply via email to