I have two pipelines
PipelineA -> output to pubsub(schema bound json) -> PipelineB

PipelineA is emitting proto models serialized as JsonStrings. Serialization
is done using JsonFormat.printer.
```
JsonFormat.printer().preservingProtoFieldNames()
                        .print(model)
```

In PipelineB I am trying to read these as jsons as Row which is tied to the
schema derived to the same proto which was used in PipelineA.

```
Schema schema = new
ProtoMessageSchema().schemaFor(TypeDescriptor.of(protoClass));
....
input.apply(JsonToRow.withSchema(schema));
...
```

The problem I am facing is with int64 type fields. When these fields are
serialized using `JsonFormat.printer` these are serialized as strings in
final json, which is by design(
https://developers.google.com/protocol-buffers/docs/proto3#json).
In pipelineB when the framework tries to deserialize these fields as int64
it fails.
```
Unable to get value from field 'site_id'. Schema type 'INT64'. JSON node
type STRING
org.apache.beam.sdk.util.RowJson$RowJsonDeserializer.extractJsonPrimitiveValue
```

Is there a way to workaround this problem, can I do something either on
serialization side or deserialization side to fix this?

Reply via email to