Hi guys,
I'd like to check if some of you have experienced the following issue:
We have a pipeline with several branches, like:
> PCollection<Long, String> inputData = p.apply(KafkaIO.read<Long,String>())
> .apply(Reshuffle.viaRandomKey())
> /* some transform */
>
/*branch1*/
> inputData.apply(Filter.by( ... ))
> /* some transform */
> .apply("Write1", BigQueryIO.write())
>
/*branch2*/
> inputData.apply(Filter.by( ... ))
> /* some transform */
> .apply("Write2", BigQueryIO.write())
/*branch3*/
> inputData.apply(Filter.by( ... ))
> /* some transform */
> .apply("Write3", BigQueryIO.write())
...
We need to remove some of the branches, so the result would be something
like:
> PCollection<Long, String> inputData = p.apply(KafkaIO.read<Long,String>())
> .apply(Reshuffle.viaRandomKey())
> /* some transform */
>
/*branch1*/
> inputData.apply(Filter.by( ... ))
> /* some transform */
> .apply("Write1", BigQueryIO.write())
>
/*branch3*/
> inputData.apply(Filter.by( ... ))
> /* some transform */
> .apply("Write3", BigQueryIO.write())
> ...
But when doing so and using Dataflow's update feature to update production
pipeline, we get the following error message:
Workflow failed. Causes: The new job is not compatible with <previous job
> id>. The original job has not been aborted., The stage that begins with
> step Reshuffle.ViaRandomKey/Reshuffle/GroupByKey no longer produces data to
> the steps
> Write3/StreamingInserts/StreamingWriteTables/Reshuffle/GroupByKey. If these
> steps have been renamed or deleted, please specify them with the update
> command.
This error doesn't make sense since we step Write3 is still receiving data
from Reshuffle.
I tried using the option transformNameMapping to inform Dataflow that some
steps were deleted. I also tried passing as argument that Write3 is still
called Write3 but the error persists.
Would you have any ideas of what may be happening?
I noticed that some steps doesn't have unique names (so the runner sets
it), like filter steps in the snippet before, but none of them are related
directly to the problem I mentioned, at least they are not mentioned in the
error.
I tried using transformNameMapping to map theses names too without success.
Thanks in advance.
--
[]s
Leonardo Alves Miguel
Data Engineer
(16) 3509-5515 | www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>