Hi all.

This could be a strictly Google Dataflow question, but here goes:

I'm currently trying to update a statefull streaming job which basically looks 
as follows (using 2.7/2.8 versions of the sdks):

    pipeline
        .apply("Read from pubsub", ...read from pubsub...)
        .apply(
            "Map PubSubMessages",
            MapElements
                .into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptor.of(PubSubMessage.class)))
                .via(input -> KV.of(input.getId1() + ":" + input.getId2(), 
input))
        )
        .apply(...stateful processing...)

PubSubMessage is decoded/encoded using the autogenerated class:

    registry.registerCoderForClass(PubSubMessage.class, 
ProtoCoder.of(PubSubMessage.class));

The problem I'm having now is that the message has been updated. PubSubMessage 
now has two additional fields. As it's proto3, it's backwards compatible. 
However, since the generated class changed, it looks like Dataflow thinks that 
the type has changed, too, and the old messages/steps are no longer compatible, 
and fails to update the job with the following:

Workflow failed. Causes: The new job is not compatible with existing job. The 
original job has not been aborted., The Coder or type for step Map 
PubSubMessages to state object/Map.out0/FromValue has changed.
It would be somewhat a major-ish pain to stop the job (draining its messages) 
and restarting it anew. Is there a better way to update the type/protobuf/coder 
in this case?

Currently we think this could very well be the result of MapElements.via with a 
lambda function:

According to Beam guidelines:

--- start quote ---
Take care when declaring your function object inline by using an anonymous 
inner class instance. In a non-static context, your inner class instance will 
implicitly contain a pointer to the enclosing class and that class’ state. That 
enclosing class will also be serialized, and thus the same considerations that 
apply to the function object itself also apply to this outer class.
--- end quote ---

To quote a colleague: So the lambda itself perhaps being at fault. The wording 
"The Coder or type for step" just makes me suspicious. If you click the mapping 
step in the working pipeline, then it has some metadata on the side 
`com.storytel.consumption.aggregation.Program$$Lambda$30/20156341` I really 
don't trust such information to be stable between builds, especially when the 
types change. But I could be "barking up the wrong tree".

If anyone has an insight, or a suggestion, or a hint how to resolve this 
problem, it would be greatly appreciated.

Reply via email to