Hi community,

Google Cloud PubSub has a feature called snapshot[1], which allows us to
apply snapshots to subscriptions.

I recently have a requirement to update the "filter" of subscription, but
"filter" is unable to modify once it is created.
Therefore, I create a snapshot on the current subscription and apply it to
a new subscription.

After resuming the Flink application with the new subscription, I got
following error repeatedly:
```
org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
passed a subscription that does not belong to the given ack ID
(resource=projects/xxxxx/subscriptions/xxxx).
        at
io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
~[?:?]
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
~[?:?]
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
~[?:?]
        at
com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
~[?:?]
        at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
~[?:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
```

I think the "ack ID" stored in savepoint became invalid after I changed the
subscription.
Since PubSub has an at-least-once guarantee, it seems safe to just ignore
these errors, or even not saving "ack ID" in checkpoint/savepoint?

I am new here. Is there any suggestion for follow-up?
Can I just create a Jira ticket for this feature request?

[1] https://cloud.google.com/pubsub/docs/replay-overview

Thanks,
sayuan

Reply via email to